mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Merge pull request #25396 from hongchaodeng/w
Automatic merge from submit-queue etcd3/watcher: Event.Object should have the same rev as etcd delete ### What's the problem? When a delete is watched, the revision should be larger than any previous to guarantee ordering. However, currently etcd3 decodes the previous rev into returned object:995f022808/pkg/storage/etcd3/watcher.go (L322)
This will break, for example, cacher's assumption here if it re-watch.995f022808/pkg/storage/cacher.go (L579-L581)
The etcd2 impl. also takes the current ModifiedIndex to ensure it's a larger number:995f022808/pkg/storage/etcd/etcd_watcher.go (L437-L442)
### What's this PR? It fixes above problem by using etcd's delete revision.
This commit is contained in:
commit
d7cf4e60cb
@ -88,7 +88,7 @@ func TestCreateWithTTL(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, 0, watch.Deleted, w, nil)
|
testCheckEventType(t, watch.Deleted, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateWithKeyExist(t *testing.T) {
|
func TestCreateWithKeyExist(t *testing.T) {
|
||||||
@ -396,7 +396,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, 0, watch.Deleted, w, nil)
|
testCheckEventType(t, watch.Deleted, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||||
|
@ -322,7 +322,11 @@ func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec r
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, getResp.Kvs[0].ModRevision)
|
// Note that this sends the *old* object with the etcd revision for the time at
|
||||||
|
// which it gets deleted.
|
||||||
|
// We assume old object is returned only in Deleted event. Users (e.g. cacher) need
|
||||||
|
// to have larger than previous rev to tell the ordering.
|
||||||
|
oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,11 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"sync"
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
|
||||||
"github.com/coreos/etcd/integration"
|
"github.com/coreos/etcd/integration"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
@ -46,7 +46,8 @@ func TestWatchList(t *testing.T) {
|
|||||||
|
|
||||||
// It tests that
|
// It tests that
|
||||||
// - first occurrence of objects should notify Add event
|
// - first occurrence of objects should notify Add event
|
||||||
// -
|
// - update should trigger Modified event
|
||||||
|
// - update that gets filtered should trigger Deleted event
|
||||||
func testWatch(t *testing.T, recursive bool) {
|
func testWatch(t *testing.T, recursive bool) {
|
||||||
ctx, store, cluster := testSetup(t)
|
ctx, store, cluster := testSetup(t)
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
@ -90,6 +91,7 @@ func testWatch(t *testing.T, recursive bool) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
|
var prevObj *api.Pod
|
||||||
for _, watchTest := range tt.watchTests {
|
for _, watchTest := range tt.watchTests {
|
||||||
out := &api.Pod{}
|
out := &api.Pod{}
|
||||||
key := tt.key
|
key := tt.key
|
||||||
@ -104,8 +106,14 @@ func testWatch(t *testing.T, recursive bool) {
|
|||||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
}
|
}
|
||||||
if watchTest.expectEvent {
|
if watchTest.expectEvent {
|
||||||
testCheckResult(t, i, watchTest.watchType, w, nil)
|
expectObj := out
|
||||||
|
if watchTest.watchType == watch.Deleted {
|
||||||
|
expectObj = prevObj
|
||||||
|
expectObj.ResourceVersion = out.ResourceVersion
|
||||||
}
|
}
|
||||||
|
testCheckResult(t, i, watchTest.watchType, w, expectObj)
|
||||||
|
}
|
||||||
|
prevObj = out
|
||||||
}
|
}
|
||||||
w.Stop()
|
w.Stop()
|
||||||
testCheckStop(t, i, w)
|
testCheckStop(t, i, w)
|
||||||
@ -123,7 +131,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||||||
if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil {
|
if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil {
|
||||||
t.Fatalf("Delete failed: %v", err)
|
t.Fatalf("Delete failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, 0, watch.Deleted, w, storedObj)
|
testCheckEventType(t, watch.Deleted, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchSync tests that
|
// TestWatchSync tests that
|
||||||
@ -168,7 +176,7 @@ func TestWatchError(t *testing.T) {
|
|||||||
func(runtime.Object) (runtime.Object, error) {
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil
|
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil
|
||||||
}))
|
}))
|
||||||
testCheckResult(t, 0, watch.Error, w, nil)
|
testCheckEventType(t, watch.Error, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchContextCancel(t *testing.T) {
|
func TestWatchContextCancel(t *testing.T) {
|
||||||
@ -213,6 +221,36 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||||
|
ctx, store, cluster := testSetup(t)
|
||||||
|
defer cluster.Terminate(t)
|
||||||
|
key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
|
||||||
|
|
||||||
|
if err := store.Delete(ctx, key, &api.Pod{}, &storage.Preconditions{}); err != nil {
|
||||||
|
t.Fatalf("Delete failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := <-w.ResultChan()
|
||||||
|
watchedDeleteObj := e.Object.(*api.Pod)
|
||||||
|
var wres clientv3.WatchResponse
|
||||||
|
wres = <-etcdW
|
||||||
|
|
||||||
|
watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision {
|
||||||
|
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
||||||
|
watchedDeleteRev, wres.Events[0].Kv.ModRevision)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type testWatchStruct struct {
|
type testWatchStruct struct {
|
||||||
obj *api.Pod
|
obj *api.Pod
|
||||||
expectEvent bool
|
expectEvent bool
|
||||||
@ -227,6 +265,17 @@ func (c *testCodec) Decode(data []byte, defaults *unversioned.GroupVersionKind,
|
|||||||
return nil, nil, errors.New("Expected decoding failure")
|
return nil, nil, errors.New("Expected decoding failure")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.Interface) {
|
||||||
|
select {
|
||||||
|
case res := <-w.ResultChan():
|
||||||
|
if res.Type != expectEventType {
|
||||||
|
t.Errorf("event type want=%v, get=%v", expectEventType, res.Type)
|
||||||
|
}
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("time out after waiting %v on ResultChan", wait.ForeverTestTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *api.Pod) {
|
func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *api.Pod) {
|
||||||
select {
|
select {
|
||||||
case res := <-w.ResultChan():
|
case res := <-w.ResultChan():
|
||||||
@ -234,7 +283,7 @@ func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w wat
|
|||||||
t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type)
|
t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if expectObj != nil && !reflect.DeepEqual(expectObj, res.Object) {
|
if !reflect.DeepEqual(expectObj, res.Object) {
|
||||||
t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object)
|
t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object)
|
||||||
}
|
}
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
Loading…
Reference in New Issue
Block a user