mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-18 16:21:13 +00:00
etcd2: watch on 0 to return ADDED always
This commit is contained in:
parent
2c61d2f80c
commit
34db128af9
@ -359,9 +359,6 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
||||
return
|
||||
}
|
||||
action := watch.Added
|
||||
if res.Node.ModifiedIndex != res.Node.CreatedIndex {
|
||||
action = watch.Modified
|
||||
}
|
||||
w.emit(watch.Event{
|
||||
Type: action,
|
||||
Object: obj,
|
||||
@ -454,6 +451,8 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||
func (w *etcdWatcher) sendResult(res *etcd.Response) {
|
||||
switch res.Action {
|
||||
case EtcdCreate, EtcdGet:
|
||||
// "Get" will only happen in watch 0 case, where we explicitly want ADDED event
|
||||
// for initial state.
|
||||
w.sendAdd(res)
|
||||
case EtcdSet, EtcdCAS:
|
||||
w.sendModify(res)
|
||||
|
@ -412,6 +412,18 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||
}
|
||||
pod.ResourceVersion = ""
|
||||
|
||||
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// The create trigger ADDED event when watching from 0
|
||||
event := <-watching.ResultChan()
|
||||
watching.Stop()
|
||||
if event.Type != watch.Added {
|
||||
t.Errorf("Unexpected event %#v", event)
|
||||
}
|
||||
|
||||
// check for concatenation on watch event with CAS
|
||||
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||
pod := input.(*api.Pod)
|
||||
@ -423,15 +435,15 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||
watching, err = h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer watching.Stop()
|
||||
|
||||
// marked as modified b/c of concatenation
|
||||
event := <-watching.ResultChan()
|
||||
if event.Type != watch.Modified {
|
||||
// because we watch from 0, first event that we receive will always be ADDED
|
||||
event = <-watching.ResultChan()
|
||||
if event.Type != watch.Added {
|
||||
t.Errorf("Unexpected event %#v", event)
|
||||
}
|
||||
|
||||
@ -451,7 +463,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
||||
t.Errorf("Unexpected event %#v", event)
|
||||
}
|
||||
|
||||
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
|
||||
if e, a := pod, event.Object; a == nil || !api.Semantic.DeepDerivative(e, a) {
|
||||
t.Errorf("Unexpected error: expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user