Merge pull request #118495 from MadhavJivrajani/cleanup-cacher-testing

storage: Cleanup cacher testing
This commit is contained in:
Kubernetes Prow Robot 2023-06-07 08:48:12 -07:00 committed by GitHub
commit a54748d652
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 114 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package tests
package cacher
import (
"context"
@ -24,7 +24,6 @@ import (
"testing"
"time"
"k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -32,27 +31,18 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
const (
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
watchCacheDefaultCapacity = 100
@ -95,27 +85,10 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source
}
func newPod() runtime.Object { return &example.Pod{} }
func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod,
prefix,
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
pagingEnabled,
etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*Cacher, storage.Versioner, error) {
prefix := "pods"
v := storage.APIObjectVersioner{}
config := cacherstorage.Config{
config := Config{
Storage: s,
Versioner: v,
GroupResource: schema.GroupResource{Resource: "pods"},
@ -127,17 +100,10 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock,
}
cacher, err := cacherstorage.NewCacherFromConfig(config)
cacher, err := NewCacherFromConfig(config)
return cacher, v, err
}
func makeTestPod(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: storagetesting.DeepEqualSafePodSpec(),
}
}
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return obj.DeepCopyObject(), nil, nil
@ -410,16 +376,16 @@ func TestWatchDeprecated(t *testing.T) {
}
defer cacher.Stop()
podFoo := makeTestPod("foo")
podBar := makeTestPod("bar")
podFoo := makeTestPodWithName("foo")
podBar := makeTestPodWithName("bar")
podFooPrime := makeTestPod("foo")
podFooPrime := makeTestPodWithName("foo")
podFooPrime.Spec.NodeName = "fakeNode"
podFooBis := makeTestPod("foo")
podFooBis := makeTestPodWithName("foo")
podFooBis.Spec.NodeName = "anotherFakeNode"
podFooNS2 := makeTestPod("foo")
podFooNS2 := makeTestPodWithName("foo")
podFooNS2.Namespace += "2"
// initialVersion is used to initate the watcher at the beginning of the world,
@ -472,7 +438,7 @@ func TestWatchDeprecated(t *testing.T) {
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
for i := 0; i < watchCacheDefaultCapacity; i++ {
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
podFoo := makeTestPod(fmt.Sprintf("foo-%d", i))
podFoo := makeTestPodWithName(fmt.Sprintf("foo-%d", i))
updatePod(t, etcdStorage, podFoo, nil)
}
@ -553,7 +519,7 @@ func withoutPaging(options *setupOptions) {
options.pagingEnabled = false
}
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstorage.Cacher, tearDownFunc) {
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts {
@ -567,7 +533,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
Errors: 1,
}
config := cacherstorage.Config{
config := Config{
Storage: wrappedStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
@ -580,7 +546,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: setupOpts.clock,
}
cacher, err := cacherstorage.NewCacherFromConfig(config)
cacher, err := NewCacherFromConfig(config)
if err != nil {
t.Fatalf("Failed to initialize cacher: %v", err)
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"testing"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
example2v1 "k8s.io/apiserver/pkg/apis/example2/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
errDummy = fmt.Errorf("dummy error")
)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(example.AddToScheme(scheme))
utilruntime.Must(examplev1.AddToScheme(scheme))
utilruntime.Must(example2v1.AddToScheme(scheme))
}
func newPod() runtime.Object { return &example.Pod{} }
func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod,
prefix,
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
pagingEnabled,
etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
func makeTestPodWithName(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: storagetesting.DeepEqualSafePodSpec(),
}
}
func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}

View File

@ -38,8 +38,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
@ -51,25 +49,11 @@ import (
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
errDummy = fmt.Errorf("dummy error")
)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(example.AddToScheme(scheme))
utilruntime.Must(examplev1.AddToScheme(scheme))
utilruntime.Must(example2v1.AddToScheme(scheme))
}
func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
prefix := "pods"
config := Config{
@ -312,7 +296,7 @@ func TestWatchCacheBypass(t *testing.T) {
}
func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
defer server.Terminate(t)
// add a few objects
@ -762,27 +746,6 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
}
}
func TestWatchInitializationSignal(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
_, err = cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
initSignal.Wait()
}
func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBookmarks bool) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)

View File

@ -18,44 +18,17 @@ package cacher
import (
"context"
"fmt"
"testing"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
)
func newPod() runtime.Object { return &example.Pod{} }
func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod, prefix,
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
true,
etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
func TestCacherListerWatcher(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
server, store := newEtcdTestStorage(t, prefix, true)
defer server.Terminate(t)
objects := []*example.Pod{
@ -89,7 +62,7 @@ func TestCacherListerWatcher(t *testing.T) {
func TestCacherListerWatcherPagination(t *testing.T) {
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
server, store := newEtcdTestStorage(t, prefix, true)
defer server.Terminate(t)
// We need the list to be sorted by name to later check the alphabetical order of