mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-01 09:18:45 +00:00
Merge pull request #108939 from stevekuznetsov/skuznets/precise-watch
pkg/storage/etcd3: be more precise in watch test
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||||
@@ -293,24 +294,26 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
etcdW := client.Watch(ctx, "/", clientv3.WithPrefix())
|
rv, err := APIObjectVersioner{}.ObjectResourceVersion(storedObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to parse resourceVersion on stored object: %v", err)
|
||||||
|
}
|
||||||
|
etcdW := client.Watch(ctx, key, clientv3.WithRev(int64(rv)))
|
||||||
|
|
||||||
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
||||||
t.Fatalf("Delete failed: %v", err)
|
t.Fatalf("Delete failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var e watch.Event
|
var e watch.Event
|
||||||
var wres clientv3.WatchResponse
|
|
||||||
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
|
watchCtx, _ := context.WithTimeout(ctx, wait.ForeverTestTimeout)
|
||||||
select {
|
select {
|
||||||
case e = <-w.ResultChan():
|
case e = <-w.ResultChan():
|
||||||
case <-watchCtx.Done():
|
case <-watchCtx.Done():
|
||||||
t.Fatalf("timed out waiting for watch event")
|
t.Fatalf("timed out waiting for watch event")
|
||||||
}
|
}
|
||||||
select {
|
deletedRV, err := deletedRevision(watchCtx, etcdW)
|
||||||
case wres = <-etcdW:
|
if err != nil {
|
||||||
case <-watchCtx.Done():
|
t.Fatalf("did not see delete event in raw watch: %v", err)
|
||||||
t.Fatalf("timed out waiting for raw watch event")
|
|
||||||
}
|
}
|
||||||
watchedDeleteObj := e.Object.(*example.Pod)
|
watchedDeleteObj := e.Object.(*example.Pod)
|
||||||
|
|
||||||
@@ -318,9 +321,24 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
||||||
}
|
}
|
||||||
if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision {
|
if int64(watchedDeleteRev) != deletedRV {
|
||||||
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
||||||
watchedDeleteRev, wres.Events[0].Kv.ModRevision)
|
watchedDeleteRev, deletedRV)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func deletedRevision(ctx context.Context, watch <-chan clientv3.WatchResponse) (int64, error) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return 0, ctx.Err()
|
||||||
|
case wres := <-watch:
|
||||||
|
for _, evt := range wres.Events {
|
||||||
|
if evt.Type == mvccpb.DELETE && evt.Kv != nil {
|
||||||
|
return evt.Kv.ModRevision, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user