Refactor cacher.ListerWatcher code structure

This commit is contained in:
Wojciech Tyczyński 2023-04-25 19:03:20 +02:00
parent b925ce2446
commit 3f247e59ed
4 changed files with 222 additions and 132 deletions

View File

@ -400,7 +400,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
watchCache := newWatchCache( watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource) config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
@ -1336,54 +1336,6 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context,
return false, nil return false, nil
} }
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
}
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: options.Limit,
Continue: options.Continue,
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error // errWatcher implements watch.Interface to return a single error
type errWatcher struct { type errWatcher struct {
result chan watch.Event result chan watch.Event

View File

@ -0,0 +1,77 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
)
// listerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type listerWatcher struct {
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
}
// NewListerWatcher returns a storage.Interface backed ListerWatcher.
func NewListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &listerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: options.Limit,
Continue: options.Continue,
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}

View File

@ -0,0 +1,144 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"fmt"
"testing"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
)
func newPod() runtime.Object { return &example.Pod{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod, prefix,
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
true,
etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
func TestCacherListerWatcher(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t)
objects := []*example.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}},
}
for _, obj := range objects {
out := &example.Pod{}
key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
}
lw := NewListerWatcher(store, prefix, fn)
obj, err := lw.List(metav1.ListOptions{})
if err != nil {
t.Fatalf("List failed: %v", err)
}
pl, ok := obj.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", pl)
}
if len(pl.Items) != 3 {
t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items))
}
}
func TestCacherListerWatcherPagination(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t)
// We need the list to be sorted by name to later check the alphabetical order of
// returned results.
objects := []*example.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "test-ns"}},
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}},
}
for _, obj := range objects {
out := &example.Pod{}
key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
}
lw := NewListerWatcher(store, prefix, fn)
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit1, ok := obj1.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit1)
}
if len(limit1.Items) != 2 {
t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items))
}
if limit1.Continue == "" {
t.Errorf("Expected list to have Continue but got none")
}
obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit2, ok := obj2.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit2)
}
if limit2.Continue != "" {
t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue)
}
if limit1.Items[0].Name != objects[0].Name {
t.Errorf("Expected list1.Items[0] to be %s but got %s", objects[0].Name, limit1.Items[0].Name)
}
if limit1.Items[1].Name != objects[1].Name {
t.Errorf("Expected list1.Items[1] to be %s but got %s", objects[1].Name, limit1.Items[1].Name)
}
if limit2.Items[0].Name != objects[2].Name {
t.Errorf("Expected list2.Items[0] to be %s but got %s", objects[2].Name, limit2.Items[0].Name)
}
}

View File

@ -557,89 +557,6 @@ func TestEmptyWatchEventCache(t *testing.T) {
} }
} }
func TestCacherListerWatcher(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix, true)
defer server.Terminate(t)
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
podBaz := makeTestPod("baz")
_ = updatePod(t, store, podFoo, nil)
_ = updatePod(t, store, podBar, nil)
_ = updatePod(t, store, podBaz, nil)
lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
obj, err := lw.List(metav1.ListOptions{})
if err != nil {
t.Fatalf("List failed: %v", err)
}
pl, ok := obj.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", pl)
}
if len(pl.Items) != 3 {
t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items))
}
}
func TestCacherListerWatcherPagination(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix, true)
defer server.Terminate(t)
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
podBaz := makeTestPod("baz")
_ = updatePod(t, store, podFoo, nil)
_ = updatePod(t, store, podBar, nil)
_ = updatePod(t, store, podBaz, nil)
lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit1, ok := obj1.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit1)
}
if len(limit1.Items) != 2 {
t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items))
}
if limit1.Continue == "" {
t.Errorf("Expected list to have Continue but got none")
}
obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue})
if err != nil {
t.Fatalf("List failed: %v", err)
}
limit2, ok := obj2.(*example.PodList)
if !ok {
t.Fatalf("Expected PodList but got %v", limit2)
}
if limit2.Continue != "" {
t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue)
}
if limit1.Items[0].Name != podBar.Name {
t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name)
}
if limit1.Items[1].Name != podBaz.Name {
t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name)
}
if limit2.Items[0].Name != podFoo.Name {
t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name)
}
}
func TestWatchDispatchBookmarkEvents(t *testing.T) { func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, cacher, terminate := testSetup(t) ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate) t.Cleanup(terminate)