Merge pull request #897 from smarterclayton/watch_starting_too_early

Watch should start from next index when getting the initial state
This commit is contained in:
brendandburns 2014-08-15 09:46:13 -07:00
commit 39d2020392
2 changed files with 33 additions and 41 deletions

View File

@ -423,11 +423,11 @@ func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, inc
return return
} }
if index, ok := etcdErrorIndex(err); ok { if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index resourceVersion = index + 1
} }
return return
} }
resourceVersion = resp.EtcdIndex resourceVersion = resp.EtcdIndex + 1
convertRecursiveResponse(resp.Node, resp, incoming) convertRecursiveResponse(resp.Node, resp, incoming)
return return
} }
@ -442,6 +442,11 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
return return
} }
copied := *response copied := *response
if node.ModifiedIndex == node.CreatedIndex {
copied.Action = "create"
} else {
copied.Action = "set"
}
copied.Node = node copied.Node = node
incoming <- &copied incoming <- &copied
} }

View File

@ -496,9 +496,9 @@ func TestWatch(t *testing.T) {
} }
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
// when no get can be done AND the server doesn't provide an index, the Watch is 0 (from now) // when server returns not found, the watch index starts at the next value (1)
if fakeClient.WatchIndex != 0 { if fakeClient.WatchIndex != 1 {
t.Errorf("Expected client to be at index %d, got %#v", 0, fakeClient) t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
} }
// Test normal case // Test normal case
@ -539,52 +539,36 @@ func TestWatchFromZeroIndex(t *testing.T) {
ExpectedVersion uint64 ExpectedVersion uint64
ExpectedType watch.EventType ExpectedType watch.EventType
}{ }{
"last write was a modify": { "get value created": {
EtcdResponseWithError{ EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: api.EncodeOrDie(pod), Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 1, ModifiedIndex: 1,
}, },
Action: "compareAndSwap", Action: "get",
EtcdIndex: 2, EtcdIndex: 2,
}, },
}, },
1, 1,
watch.Modified,
},
"last write was a delete": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 1,
},
Action: "delete",
EtcdIndex: 3,
},
},
2,
watch.Deleted,
},
"last write was a create": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 2,
},
Action: "create",
EtcdIndex: 3,
},
},
2,
watch.Added, watch.Added,
}, },
"get value modified": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
Action: "get",
EtcdIndex: 3,
},
},
2,
watch.Modified,
},
} }
for k, testCase := range testCases { for k, testCase := range testCases {
@ -598,6 +582,9 @@ func TestWatchFromZeroIndex(t *testing.T) {
} }
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
}
// the existing node is detected and the index set // the existing node is detected and the index set
event := <-watching.ResultChan() event := <-watching.ResultChan()
@ -693,8 +680,8 @@ func TestWatchFromNotFound(t *testing.T) {
} }
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 2 { if fakeClient.WatchIndex != 3 {
t.Errorf("Expected client to wait for %d, got %#v", 2, fakeClient) t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
} }
watching.Stop() watching.Stop()