diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 4453b5cf1d3..5e5aa572d33 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -400,7 +400,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { watchCache := newWatchCache( config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) - listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) + listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) @@ -1336,54 +1336,6 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, return false, nil } -// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. -type cacherListerWatcher struct { - storage storage.Interface - resourcePrefix string - newListFunc func() runtime.Object -} - -// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher. -func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { - return &cacherListerWatcher{ - storage: storage, - resourcePrefix: resourcePrefix, - newListFunc: newListFunc, - } -} - -// Implements cache.ListerWatcher interface. -func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { - list := lw.newListFunc() - pred := storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.Everything(), - Limit: options.Limit, - Continue: options.Continue, - } - - storageOpts := storage.ListOptions{ - ResourceVersionMatch: options.ResourceVersionMatch, - Predicate: pred, - Recursive: true, - } - if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil { - return nil, err - } - return list, nil -} - -// Implements cache.ListerWatcher interface. -func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - opts := storage.ListOptions{ - ResourceVersion: options.ResourceVersion, - Predicate: storage.Everything, - Recursive: true, - ProgressNotify: true, - } - return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts) -} - // errWatcher implements watch.Interface to return a single error type errWatcher struct { result chan watch.Event diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go new file mode 100644 index 00000000000..1252e5e3495 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go @@ -0,0 +1,77 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/tools/cache" +) + +// listerWatcher opaques storage.Interface to expose cache.ListerWatcher. +type listerWatcher struct { + storage storage.Interface + resourcePrefix string + newListFunc func() runtime.Object +} + +// NewListerWatcher returns a storage.Interface backed ListerWatcher. +func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher { + return &listerWatcher{ + storage: storage, + resourcePrefix: resourcePrefix, + newListFunc: newListFunc, + } +} + +// Implements cache.ListerWatcher interface. +func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { + list := lw.newListFunc() + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: options.Limit, + Continue: options.Continue, + } + + storageOpts := storage.ListOptions{ + ResourceVersionMatch: options.ResourceVersionMatch, + Predicate: pred, + Recursive: true, + } + if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil { + return nil, err + } + return list, nil +} + +// Implements cache.ListerWatcher interface. +func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + opts := storage.ListOptions{ + ResourceVersion: options.ResourceVersion, + Predicate: storage.Everything, + Recursive: true, + ProgressNotify: true, + } + return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go new file mode 100644 index 00000000000..44a55877879 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "context" + "fmt" + "testing" + + "k8s.io/apimachinery/pkg/api/apitesting" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/value/encrypt/identity" +) + +func newPod() runtime.Object { return &example.Pod{} } + +func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { + server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + storage := etcd3.New( + server.V3Client, + apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), + newPod, prefix, + schema.GroupResource{Resource: "pods"}, + identity.NewEncryptCheckTransformer(), + true, + etcd3.NewDefaultLeaseManagerConfig()) + return server, storage +} + +func TestCacherListerWatcher(t *testing.T) { + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + objects := []*example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "test-ns"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "test-ns"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, + } + for _, obj := range objects { + out := &example.Pod{} + key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) + if err := store.Create(context.Background(), key, obj, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + } + + lw := NewListerWatcher(store, prefix, fn) + + obj, err := lw.List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + pl, ok := obj.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %v", pl) + } + if len(pl.Items) != 3 { + t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items)) + } +} + +func TestCacherListerWatcherPagination(t *testing.T) { + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + // We need the list to be sorted by name to later check the alphabetical order of + // returned results. + objects := []*example.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "test-ns"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "test-ns"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, + } + for _, obj := range objects { + out := &example.Pod{} + key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) + if err := store.Create(context.Background(), key, obj, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + } + + lw := NewListerWatcher(store, prefix, fn) + + obj1, err := lw.List(metav1.ListOptions{Limit: 2}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + limit1, ok := obj1.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %v", limit1) + } + if len(limit1.Items) != 2 { + t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items)) + } + if limit1.Continue == "" { + t.Errorf("Expected list to have Continue but got none") + } + obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + limit2, ok := obj2.(*example.PodList) + if !ok { + t.Fatalf("Expected PodList but got %v", limit2) + } + if limit2.Continue != "" { + t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue) + } + + if limit1.Items[0].Name != objects[0].Name { + t.Errorf("Expected list1.Items[0] to be %s but got %s", objects[0].Name, limit1.Items[0].Name) + } + if limit1.Items[1].Name != objects[1].Name { + t.Errorf("Expected list1.Items[1] to be %s but got %s", objects[1].Name, limit1.Items[1].Name) + } + if limit2.Items[0].Name != objects[2].Name { + t.Errorf("Expected list2.Items[0] to be %s but got %s", objects[2].Name, limit2.Items[0].Name) + } + +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 60017e3b374..9e23687cf8d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -557,89 +557,6 @@ func TestEmptyWatchEventCache(t *testing.T) { } } -func TestCacherListerWatcher(t *testing.T) { - prefix := "pods" - fn := func() runtime.Object { return &example.PodList{} } - server, store := newEtcdTestStorage(t, prefix, true) - defer server.Terminate(t) - - podFoo := makeTestPod("foo") - podBar := makeTestPod("bar") - podBaz := makeTestPod("baz") - - _ = updatePod(t, store, podFoo, nil) - _ = updatePod(t, store, podBar, nil) - _ = updatePod(t, store, podBaz, nil) - - lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) - - obj, err := lw.List(metav1.ListOptions{}) - if err != nil { - t.Fatalf("List failed: %v", err) - } - pl, ok := obj.(*example.PodList) - if !ok { - t.Fatalf("Expected PodList but got %v", pl) - } - if len(pl.Items) != 3 { - t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items)) - } -} - -func TestCacherListerWatcherPagination(t *testing.T) { - prefix := "pods" - fn := func() runtime.Object { return &example.PodList{} } - server, store := newEtcdTestStorage(t, prefix, true) - defer server.Terminate(t) - - podFoo := makeTestPod("foo") - podBar := makeTestPod("bar") - podBaz := makeTestPod("baz") - - _ = updatePod(t, store, podFoo, nil) - _ = updatePod(t, store, podBar, nil) - _ = updatePod(t, store, podBaz, nil) - - lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn) - - obj1, err := lw.List(metav1.ListOptions{Limit: 2}) - if err != nil { - t.Fatalf("List failed: %v", err) - } - limit1, ok := obj1.(*example.PodList) - if !ok { - t.Fatalf("Expected PodList but got %v", limit1) - } - if len(limit1.Items) != 2 { - t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items)) - } - if limit1.Continue == "" { - t.Errorf("Expected list to have Continue but got none") - } - obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue}) - if err != nil { - t.Fatalf("List failed: %v", err) - } - limit2, ok := obj2.(*example.PodList) - if !ok { - t.Fatalf("Expected PodList but got %v", limit2) - } - if limit2.Continue != "" { - t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue) - } - - if limit1.Items[0].Name != podBar.Name { - t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name) - } - if limit1.Items[1].Name != podBaz.Name { - t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name) - } - if limit2.Items[0].Name != podFoo.Name { - t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name) - } - -} - func TestWatchDispatchBookmarkEvents(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate)