Merge pull request #38079 from hongchaodeng/e2

Automatic merge from submit-queue

etcd2: watching from 0 returns all initial states as ADDED events

ref:
https://github.com/kubernetes/kubernetes/pull/36797
https://github.com/kubernetes/kubernetes/issues/36545
https://github.com/kubernetes/kubernetes/pull/36561
https://github.com/kubernetes/kubernetes/issues/13969

Since we have made consensus and fixed the behavior in etcd3, we would also change etcd2 to make this uniform and consistent. The end goal is that we would have it explicit on interface docs.

**release note**:
```
etcd2: watching from 0 returns all initial states as ADDED events
```
This commit is contained in:
Kubernetes Submit Queue 2016-12-06 02:45:29 -08:00 committed by GitHub
commit 8e8599fcd7
3 changed files with 23 additions and 8 deletions

View File

@ -359,9 +359,6 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
return
}
action := watch.Added
if res.Node.ModifiedIndex != res.Node.CreatedIndex {
action = watch.Modified
}
w.emit(watch.Event{
Type: action,
Object: obj,
@ -454,6 +451,8 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case EtcdCreate, EtcdGet:
// "Get" will only happen in watch 0 case, where we explicitly want ADDED event
// for initial state.
w.sendAdd(res)
case EtcdSet, EtcdCAS:
w.sendModify(res)

View File

@ -412,6 +412,18 @@ func TestWatchFromZeroIndex(t *testing.T) {
}
pod.ResourceVersion = ""
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// The create trigger ADDED event when watching from 0
event := <-watching.ResultChan()
watching.Stop()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
// check for concatenation on watch event with CAS
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*api.Pod)
@ -423,15 +435,15 @@ func TestWatchFromZeroIndex(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
watching, err = h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
// marked as modified b/c of concatenation
event := <-watching.ResultChan()
if event.Type != watch.Modified {
// because we watch from 0, first event that we receive will always be ADDED
event = <-watching.ResultChan()
if event.Type != watch.Added {
t.Errorf("Unexpected event %#v", event)
}
@ -451,7 +463,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
t.Errorf("Unexpected event %#v", event)
}
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
if e, a := pod, event.Object; a == nil || !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Unexpected error: expected %#v, got %#v", e, a)
}
}

View File

@ -112,6 +112,8 @@ type Interface interface {
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
@ -119,6 +121,8 @@ type Interface interface {
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either