Merge pull request #117584 from wojtek-t/move_cacher_lister

Refactor cacher.ListerWatcher code structure
This commit is contained in:
Kubernetes Prow Robot 2023-04-25 12:04:16 -07:00 committed by GitHub
commit e9e60316d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 222 additions and 132 deletions

View File

@ -400,7 +400,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
@ -1336,54 +1336,6 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context,
return false, nil
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
}
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: options.Limit,
Continue: options.Continue,
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event

View File

@ -0,0 +1,77 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
)
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type listerWatcher struct {
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
}
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &listerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: options.Limit,
Continue: options.Continue,
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}

View File

@ -0,0 +1,144 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"fmt"
"testing"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
)
func newPod() runtime.Object { return &example.Pod{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod, prefix,
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
true,
etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
func TestCacherListerWatcher(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t)
objects := []*example.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}},
}
for _, obj := range objects {
out := &example.Pod{}
key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
}
lw := NewListerWatcher(store, prefix, fn)
obj, err := lw.List(metav1.ListOptions{})
if err != nil {
t.Fatalf("List failed: %v", err)
}
pl, ok := obj.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", pl)
}
if len(pl.Items) != 3 {
t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items))
}
}
func TestCacherListerWatcherPagination(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t)
// We need the list to be sorted by name to later check the alphabetical order of
// returned results.
objects := []*example.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}},
}
for _, obj := range objects {
out := &example.Pod{}
key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
}
lw := NewListerWatcher(store, prefix, fn)
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit1, ok := obj1.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit1)
}
if len(limit1.Items) != 2 {
t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items))
}
if limit1.Continue == "" {
t.Errorf("Expected list to have Continue but got none")
}
obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit2, ok := obj2.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit2)
}
if limit2.Continue != "" {
t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue)
}
if limit1.Items[0].Name != objects[0].Name {
t.Errorf("Expected list1.Items[0] to be %s but got %s", objects[0].Name, limit1.Items[0].Name)
}
if limit1.Items[1].Name != objects[1].Name {
t.Errorf("Expected list1.Items[1] to be %s but got %s", objects[1].Name, limit1.Items[1].Name)
}
if limit2.Items[0].Name != objects[2].Name {
t.Errorf("Expected list2.Items[0] to be %s but got %s", objects[2].Name, limit2.Items[0].Name)
}
}

View File

@ -557,89 +557,6 @@ func TestEmptyWatchEventCache(t *testing.T) {
}
}
func TestCacherListerWatcher(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix, true)
defer server.Terminate(t)
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
podBaz := makeTestPod("baz")
_ = updatePod(t, store, podFoo, nil)
_ = updatePod(t, store, podBar, nil)
_ = updatePod(t, store, podBaz, nil)
lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
obj, err := lw.List(metav1.ListOptions{})
if err != nil {
t.Fatalf("List failed: %v", err)
}
pl, ok := obj.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", pl)
}
if len(pl.Items) != 3 {
t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items))
}
}
func TestCacherListerWatcherPagination(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix, true)
defer server.Terminate(t)
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
podBaz := makeTestPod("baz")
_ = updatePod(t, store, podFoo, nil)
_ = updatePod(t, store, podBar, nil)
_ = updatePod(t, store, podBaz, nil)
lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit1, ok := obj1.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit1)
}
if len(limit1.Items) != 2 {
t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items))
}
if limit1.Continue == "" {
t.Errorf("Expected list to have Continue but got none")
}
obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit2, ok := obj2.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit2)
}
if limit2.Continue != "" {
t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue)
}
if limit1.Items[0].Name != podBar.Name {
t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name)
}
if limit1.Items[1].Name != podBaz.Name {
t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name)
}
if limit2.Items[0].Name != podFoo.Name {
t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name)
}
}
func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)