From 2631c0a0f959bd67aa455045dce33e77150ab5f8 Mon Sep 17 00:00:00 2001 From: Steve Kuznetsov Date: Wed, 23 Mar 2022 12:59:53 -0800 Subject: [PATCH] pkg/storage/etcd3: be more precise in watch test Previously, this test assumed that: - a global watch would return only an event for the key in question - only the delete event in question would be returned Neither of these assumptions are correct for an etcd backend as long as any other clients are interacting with the system. This commit makes the watch more specific and extracts the correct event. Signed-off-by: Steve Kuznetsov --- .../pkg/storage/etcd3/watcher_test.go | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 39f4cb75497..eb3d76fc379 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/apimachinery/pkg/api/apitesting" @@ -293,24 +294,26 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - etcdW := client.Watch(ctx, "/", clientv3.WithPrefix()) + rv, err := APIObjectVersioner{}.ObjectResourceVersion(storedObj) + if err != nil { + t.Fatalf("failed to parse resourceVersion on stored object: %v", err) + } + etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv))) if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { t.Fatalf("Delete failed: %v", err) } var e watch.Event - var wres clientv3.WatchResponse watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) select { case e = <-w.ResultChan(): case <-watchCtx.Done(): t.Fatalf("timed out waiting for watch event") } - select { - case wres = <-etcdW: - case <-watchCtx.Done(): - t.Fatalf("timed out waiting for raw watch event") + deletedRV, err := deletedRevision(watchCtx, etcdW) + if err != nil { + t.Fatalf("did not see delete event in raw watch: %v", err) } watchedDeleteObj := e.Object.(*example.Pod) @@ -318,9 +321,24 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { if err != nil { t.Fatalf("ParseWatchResourceVersion failed: %v", err) } - if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { + if int64(watchedDeleteRev) != deletedRV { t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", - watchedDeleteRev, wres.Events[0].Kv.ModRevision) + watchedDeleteRev, deletedRV) + } +} + +func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) { + for { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case wres := <-watch: + for _, evt := range wres.Events { + if evt.Type == mvccpb.DELETE && evt.Kv != nil { + return evt.Kv.ModRevision, nil + } + } + } } }