Refactor some watchcache tests

This commit is contained in:
Wojciech Tyczyński 2023-04-27 11:59:33 +02:00
parent 78b56ce16d
commit 1eca720dcc
4 changed files with 105 additions and 125 deletions

View File

@ -62,7 +62,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
@ -182,7 +182,7 @@ TestCase:
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0)
ch := w.ResultChan()
@ -219,7 +219,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
// timeout to zero and run the Stop goroutine concurrently.
// May sure that the watch will not be blocked on Stop.
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "")
go w.Stop()
select {
case <-done:
@ -231,7 +231,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
deadline := time.Now().Add(time.Hour)
// After that, verifies the cacheWatcher.process goroutine works correctly.
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(2, filter, emptyFunc, storage.APIObjectVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "")
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
@ -308,7 +308,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true }
forget := func(_ bool) {}
deadline := time.Now().Add(time.Minute)
w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
w := newCacheWatcher(numObjects+1, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
// Simulate a situation when the last event will that was already in
// the state, wasn't yet processed by cacher and will be delivered
@ -351,7 +351,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
forget := func(bool) {}
newWatcher := func(deadline time.Time) *cacheWatcher {
w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
w.setBookmarkAfterResourceVersion(0)
return w
}
@ -418,7 +418,7 @@ func TestCacheWatcherDraining(t *testing.T) {
makeWatchCacheEvent(5),
makeWatchCacheEvent(6),
}
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher")
@ -459,7 +459,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
makeWatchCacheEvent(5),
makeWatchCacheEvent(6),
}
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher")
@ -496,7 +496,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T
{Object: &v1.Pod{}},
{Object: &v1.Pod{}},
}
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w.setBookmarkAfterResourceVersion(10)
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0)
if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) {
@ -542,7 +542,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
w.stopLocked()
}
initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}}
w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w = newCacheWatcher(2, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "")
w.setBookmarkAfterResourceVersion(10)
go w.processInterval(ctx, intervalFromEvents(initEvents), 0)
watchInitializationSignal.Wait()
@ -596,7 +596,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) {
func TestBookmarkAfterResourceVersionWatchers(t *testing.T) {
newWatcher := func(id string, deadline time.Time) *cacheWatcher {
w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id)
w.setBookmarkAfterResourceVersion(10)
return w
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -58,42 +57,6 @@ import (
"k8s.io/utils/pointer"
)
type testVersioner struct{}
func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
}
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, count *int64) error {
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
}
listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10))
listAccessor.SetContinue(continueValue)
listAccessor.SetRemainingItemCount(count)
return nil
}
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return 0, err
}
version := accessor.GetResourceVersion()
if len(version) == 0 {
return 0, nil
}
return strconv.ParseUint(version, 10, 64)
}
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
if len(resourceVersion) == 0 {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
}
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
@ -111,7 +74,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
prefix := "pods"
config := Config{
Storage: s,
Versioner: testVersioner{},
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
@ -133,7 +96,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
Clock: clock.RealClock{},
}
cacher, err := NewCacherFromConfig(config)
return cacher, testVersioner{}, err
return cacher, storage.APIObjectVersioner{}, err
}
type dummyStorage struct {
@ -348,6 +311,90 @@ func TestWatchCacheBypass(t *testing.T) {
}
}
func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
// add a few objects
v := storage.APIObjectVersioner{}
lastRV := uint64(0)
for i := 0; i < 5; i++ {
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}}
out := &example.Pod{}
key := computePodKey(pod)
if err := etcdStorage.Create(context.Background(), key, pod, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
var err error
if lastRV, err = v.ParseResourceVersion(out.ResourceVersion); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
cacher, _, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// Given that cacher is always initialized from the "current" version of etcd,
// we now have a cacher with an empty cache of watch events and a resourceVersion of rv.
// It should support establishing watches from rv and higher, but not older.
expectedResourceExpiredError := apierrors.NewResourceExpired("").ErrStatus
tests := []struct {
name string
resourceVersion uint64
expectedEvent *watch.Event
}{
{
name: "RV-1",
resourceVersion: lastRV - 1,
expectedEvent: &watch.Event{Type: watch.Error, Object: &expectedResourceExpiredError},
},
{
name: "RV",
resourceVersion: lastRV,
},
{
name: "RV+1",
resourceVersion: lastRV + 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := storage.ListOptions{
ResourceVersion: strconv.Itoa(int(tt.resourceVersion)),
Predicate: storage.Everything,
}
watcher, err := cacher.Watch(context.Background(), "/pods/test-ns", opts)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer watcher.Stop()
select {
case event := <-watcher.ResultChan():
if tt.expectedEvent == nil {
t.Errorf("Unexpected event: type=%#v, object=%#v", event.Type, event.Object)
break
}
if e, a := tt.expectedEvent.Type, event.Type; e != a {
t.Errorf("Expected: %s, got: %s", e, a)
}
if e, a := tt.expectedEvent.Object, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected: %#v, got: %#v", e, a)
}
case <-time.After(3 * time.Second):
if tt.expectedEvent != nil {
t.Errorf("Failed to get an event")
}
// watch remained established successfully
}
})
}
}
func TestWatchNotHangingOnStartupFailure(t *testing.T) {
// Configure cacher so that it can't initialize, because of
// constantly failing lists to the underlying storage.
@ -378,7 +425,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) {
func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
cacher, v, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
@ -448,7 +495,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
shouldContinue = false
break
}
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
rv, err := v.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion())
if err != nil {
t.Errorf("unexpected parsing error: %v", err)
} else {

View File

@ -35,6 +35,10 @@ import (
func newPod() runtime.Object { return &example.Pod{} }
func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
}
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
@ -61,7 +65,7 @@ func TestCacherListerWatcher(t *testing.T) {
}
for _, obj := range objects {
out := &example.Pod{}
key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
key := computePodKey(obj)
if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
@ -97,7 +101,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
}
for _, obj := range objects {
out := &example.Pod{}
key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
key := computePodKey(obj)
if err := store.Create(context.Background(), key, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}

View File

@ -112,10 +112,6 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
return server, storage
}
func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versioner, error) {
return newTestCacherWithClock(s, clock.RealClock{})
}
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
prefix := "pods"
v := storage.APIObjectVersioner{}
@ -490,73 +486,6 @@ func TestWatchDeprecated(t *testing.T) {
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
}
func TestEmptyWatchEventCache(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
defer server.Terminate(t)
// add a few objects
updatePod(t, etcdStorage, makeTestPod("pod1"), nil)
updatePod(t, etcdStorage, makeTestPod("pod2"), nil)
updatePod(t, etcdStorage, makeTestPod("pod3"), nil)
updatePod(t, etcdStorage, makeTestPod("pod4"), nil)
updatePod(t, etcdStorage, makeTestPod("pod5"), nil)
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
cacher, v, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// get rv of last pod created
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
// It should support establishing watches from rv and higher, but not older.
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError)
}
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
select {
case e := <-watcher.ResultChan():
t.Errorf("unexpected event %#v", e)
case <-time.After(3 * time.Second):
// watch from rv+1 remained established successfully
}
}
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
select {
case e := <-watcher.ResultChan():
t.Errorf("unexpected event %#v", e)
case <-time.After(3 * time.Second):
// watch from rv remained established successfully
}
}
}
func TestWatchDispatchBookmarkEvents(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)