diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 39f4cb75497..eb3d76fc379 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/apimachinery/pkg/api/apitesting" @@ -293,24 +294,26 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { if err != nil { t.Fatalf("Watch failed: %v", err) } - etcdW := client.Watch(ctx, "/", clientv3.WithPrefix()) + rv, err := APIObjectVersioner{}.ObjectResourceVersion(storedObj) + if err != nil { + t.Fatalf("failed to parse resourceVersion on stored object: %v", err) + } + etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv))) if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { t.Fatalf("Delete failed: %v", err) } var e watch.Event - var wres clientv3.WatchResponse watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout) select { case e = <-w.ResultChan(): case <-watchCtx.Done(): t.Fatalf("timed out waiting for watch event") } - select { - case wres = <-etcdW: - case <-watchCtx.Done(): - t.Fatalf("timed out waiting for raw watch event") + deletedRV, err := deletedRevision(watchCtx, etcdW) + if err != nil { + t.Fatalf("did not see delete event in raw watch: %v", err) } watchedDeleteObj := e.Object.(*example.Pod) @@ -318,9 +321,24 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { if err != nil { t.Fatalf("ParseWatchResourceVersion failed: %v", err) } - if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision { + if int64(watchedDeleteRev) != deletedRV { t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d", - watchedDeleteRev, wres.Events[0].Kv.ModRevision) + watchedDeleteRev, deletedRV) + } +} + +func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) { + for { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case wres := <-watch: + for _, evt := range wres.Events { + if evt.Type == mvccpb.DELETE && evt.Kv != nil { + return evt.Kv.ModRevision, nil + } + } + } } }