Refactor watch bookmark tests to allow sharing between etcd3 and watchcache

This commit is contained in:
Wojciech Tyczyński 2023-04-17 14:22:52 +02:00
parent 94a15929cf
commit 0297329795
3 changed files with 184 additions and 157 deletions

View File

@ -36,12 +36,12 @@ func TestWatch(t *testing.T) {
func TestClusterScopedWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.TestClusterScopedWatch(ctx, t, store)
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
}
func TestNamespaceScopedWatch(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.TestNamespaceScopedWatch(ctx, t, store)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
}
func TestDeleteTriggerWatch(t *testing.T) {

View File

@ -19,6 +19,7 @@ package testing
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -376,7 +377,7 @@ func RunOptionalTestProgressNotify(ctx context.Context, t *testing.T, store stor
}
// It tests watches of cluster-scoped resources.
func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
func RunTestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
tests := []struct {
name string
// For watch request, the name of object is specified with field selector
@ -530,7 +531,7 @@ func TestClusterScopedWatch(ctx context.Context, t *testing.T, store storage.Int
}
// It tests watch of namespace-scoped resources.
func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
func RunTestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.Interface) {
tests := []struct {
name string
// For watch request, the name of object is specified with field selector
@ -844,6 +845,177 @@ func TestNamespaceScopedWatch(ctx context.Context, t *testing.T, store storage.I
}
}
// RunOptionalTestWatchDispatchBookmarkEvents tests whether bookmark events are sent.
// This feature is currently implemented in watch cache layer, so this is optional.
//
// TODO(#109831): ProgressNotify feature is effectively implementing the same
//
// functionality, so we should refactor this functionality to share the same input.
func RunTestWatchDispatchBookmarkEvents(ctx context.Context, t *testing.T, store storage.Interface) {
key, storedObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
startRV := storedObj.ResourceVersion
tests := []struct {
name string
timeout time.Duration
expected bool
allowWatchBookmarks bool
}{
{ // test old client won't get Bookmark event
name: "allowWatchBookmarks=false",
timeout: 3 * time.Second,
expected: false,
allowWatchBookmarks: false,
},
{
name: "allowWatchBookmarks=true",
timeout: 3 * time.Second,
expected: true,
allowWatchBookmarks: true,
},
}
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = tt.allowWatchBookmarks
ctx, cancel := context.WithTimeout(context.Background(), tt.timeout)
defer cancel()
watcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: startRV, Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
// Create events of pods in a different namespace
out := &example.Pod{}
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: fmt.Sprintf("other-ns-%d", i)}}
objKey := computePodKey(obj)
if err := store.Create(ctx, objKey, obj, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
// Now wait for Bookmark event
select {
case event, ok := <-watcher.ResultChan():
if !ok && tt.expected {
t.Errorf("Unexpected object watched (no objects)")
}
if tt.expected && event.Type != watch.Bookmark {
t.Errorf("Unexpected object watched %#v", event)
}
case <-time.After(time.Second * 3):
if tt.expected {
t.Errorf("Unexpected object watched (timeout)")
}
}
})
}
}
// RunOptionalTestWatchBookmarksWithCorrectResourceVersion tests whether bookmark events are
// sent with correct resource versions.
// This feature is currently implemented in watch cache layer, so this is optional.
//
// TODO(#109831): ProgressNotify feature is effectively implementing the same
//
// functionality, so we should refactor this functionality to share the same input.
func RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx context.Context, t *testing.T, store storage.Interface) {
// Compute the initial resource version.
list := &example.PodList{}
storageOpts := storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
}
if err := store.GetList(ctx, "/pods", storageOpts, list); err != nil {
t.Errorf("Unexpected error: %v", err)
}
startRV := list.ResourceVersion
key := "/pods/test-ns"
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
watcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: startRV, Predicate: pred, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
done := make(chan struct{})
errc := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
// We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers.
defer wg.Wait()
// Call close first, so the goroutine knows to exit.
defer close(done)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
select {
case <-done:
return
default:
out := &example.Pod{}
pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("foo-%d", i),
Namespace: "test-ns",
},
}
podKey := computePodKey(pod)
if err := store.Create(ctx, podKey, pod, out, 0); err != nil {
errc <- fmt.Errorf("failed to create pod %v: %v", pod, err)
return
}
time.Sleep(10 * time.Millisecond)
}
}
}()
bookmarkReceived := false
lastObservedResourceVersion := uint64(0)
for {
select {
case err := <-errc:
t.Fatal(err)
case event, ok := <-watcher.ResultChan():
if !ok {
// Make sure we have received a bookmark event
if !bookmarkReceived {
t.Fatalf("Unpexected error, we did not received a bookmark event")
}
return
}
rv, err := storage.APIObjectVersioner{}.ObjectResourceVersion(event.Object)
if err != nil {
t.Fatalf("failed to parse resourceVersion from %#v", event)
}
if event.Type == watch.Bookmark {
bookmarkReceived = true
// bookmark event has a RV greater than or equal to the before one
if rv < lastObservedResourceVersion {
t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion)
}
} else {
// non-bookmark event has a RV greater than anything before
if rv <= lastObservedResourceVersion {
t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion)
}
}
lastObservedResourceVersion = rv
}
}
}
type testWatchStruct struct {
obj *example.Pod
expectEvent bool

View File

@ -21,7 +21,6 @@ import (
"fmt"
goruntime "runtime"
"strconv"
"sync"
"testing"
"time"
@ -40,15 +39,12 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
)
@ -139,12 +135,6 @@ func makeTestPod(name string) *example.Pod {
}
}
func createPod(s storage.Interface, obj *example.Pod) error {
key := "pods/" + obj.Namespace + "/" + obj.Name
out := &example.Pod{}
return s.Create(context.TODO(), key, obj, out, 0)
}
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return obj.DeepCopyObject(), nil, nil
@ -182,13 +172,13 @@ func TestList(t *testing.T) {
func TestClusterScopedWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withClusterScopedKeyFunc, withSpecNodeNameIndexerFuncs)
t.Cleanup(terminate)
storagetesting.TestClusterScopedWatch(ctx, t, cacher)
storagetesting.RunTestClusterScopedWatch(ctx, t, cacher)
}
func TestNamespaceScopedWatch(t *testing.T) {
ctx, cacher, terminate := testSetup(t, withSpecNodeNameIndexerFuncs)
t.Cleanup(terminate)
storagetesting.TestNamespaceScopedWatch(ctx, t, cacher)
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
}
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
@ -618,150 +608,15 @@ func TestCacherListerWatcherPagination(t *testing.T) {
}
func TestWatchDispatchBookmarkEvents(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(rv))
tests := []struct {
timeout time.Duration
expected bool
allowWatchBookmark bool
}{
{ // test old client won't get Bookmark event
timeout: 3 * time.Second,
expected: false,
allowWatchBookmark: false,
},
{
timeout: 3 * time.Second,
expected: true,
allowWatchBookmark: true,
},
}
for i, c := range tests {
pred := storage.Everything
pred.AllowWatchBookmarks = c.allowWatchBookmark
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
t.Cleanup(cancel)
watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Create events of other pods
updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-whatever-%d", i)), nil)
// Now wait for Bookmark event
select {
case event, ok := <-watcher.ResultChan():
if !ok && c.expected {
t.Errorf("Unexpected object watched (no objects)")
}
if c.expected && event.Type != watch.Bookmark {
t.Errorf("Unexpected object watched %#v", event)
}
case <-time.After(time.Second * 3):
if c.expected {
t.Errorf("Unexpected object watched (timeout)")
}
}
watcher.Stop()
}
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, cacher)
}
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
defer server.Terminate(t)
cacher, v, err := newTestCacher(etcdStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
watcher, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
done := make(chan struct{})
errc := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait() // We must wait for the waitgroup to exit before we terminate the cache or the server in prior defers
defer close(done) // call close first, so the goroutine knows to exit
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
select {
case <-done:
return
default:
pod := fmt.Sprintf("foo-%d", i)
err := createPod(etcdStorage, makeTestPod(pod))
if err != nil {
errc <- fmt.Errorf("failed to create pod %v: %v", pod, err)
return
}
time.Sleep(time.Second / 100)
}
}
}()
bookmarkReceived := false
lastObservedResourceVersion := uint64(0)
for {
select {
case err := <-errc:
t.Fatal(err)
case event, ok := <-watcher.ResultChan():
if !ok {
// Make sure we have received a bookmark event
if !bookmarkReceived {
t.Fatalf("Unpexected error, we did not received a bookmark event")
}
return
}
rv, err := v.ObjectResourceVersion(event.Object)
if err != nil {
t.Fatalf("failed to parse resourceVersion from %#v", event)
}
if event.Type == watch.Bookmark {
bookmarkReceived = true
// bookmark event has a RV greater than or equal to the before one
if rv < lastObservedResourceVersion {
t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion)
}
} else {
// non-bookmark event has a RV greater than anything before
if rv <= lastObservedResourceVersion {
t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion)
}
}
lastObservedResourceVersion = rv
}
}
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher)
}
// ===================================================