Watch should start from next index when getting the initial state

Also, the fake response we return should have an Action that is
appropriate for the operation.
This commit is contained in:
Clayton Coleman 2014-08-14 15:51:56 -04:00
parent 778c89f65e
commit 7c67cbff04
2 changed files with 33 additions and 41 deletions

View File

@ -419,11 +419,11 @@ func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, inc
return return
} }
if index, ok := etcdErrorIndex(err); ok { if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index resourceVersion = index + 1
} }
return return
} }
resourceVersion = resp.EtcdIndex resourceVersion = resp.EtcdIndex + 1
convertRecursiveResponse(resp.Node, resp, incoming) convertRecursiveResponse(resp.Node, resp, incoming)
return return
} }
@ -438,6 +438,11 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
return return
} }
copied := *response copied := *response
if node.ModifiedIndex == node.CreatedIndex {
copied.Action = "create"
} else {
copied.Action = "set"
}
copied.Node = node copied.Node = node
incoming <- &copied incoming <- &copied
} }

View File

@ -464,9 +464,9 @@ func TestWatch(t *testing.T) {
} }
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
// when no get can be done AND the server doesn't provide an index, the Watch is 0 (from now) // when server returns not found, the watch index starts at the next value (1)
if fakeClient.WatchIndex != 0 { if fakeClient.WatchIndex != 1 {
t.Errorf("Expected client to be at index %d, got %#v", 0, fakeClient) t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient)
} }
// Test normal case // Test normal case
@ -507,52 +507,36 @@ func TestWatchFromZeroIndex(t *testing.T) {
ExpectedVersion uint64 ExpectedVersion uint64
ExpectedType watch.EventType ExpectedType watch.EventType
}{ }{
"last write was a modify": { "get value created": {
EtcdResponseWithError{ EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: api.EncodeOrDie(pod), Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 1, ModifiedIndex: 1,
}, },
Action: "compareAndSwap", Action: "get",
EtcdIndex: 2, EtcdIndex: 2,
}, },
}, },
1, 1,
watch.Modified,
},
"last write was a delete": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 1,
},
Action: "delete",
EtcdIndex: 3,
},
},
2,
watch.Deleted,
},
"last write was a create": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
ModifiedIndex: 2,
},
Action: "create",
EtcdIndex: 3,
},
},
2,
watch.Added, watch.Added,
}, },
"get value modified": {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: api.EncodeOrDie(pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
Action: "get",
EtcdIndex: 3,
},
},
2,
watch.Modified,
},
} }
for k, testCase := range testCases { for k, testCase := range testCases {
@ -566,6 +550,9 @@ func TestWatchFromZeroIndex(t *testing.T) {
} }
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a {
t.Errorf("%s: expected watch index to be %d, got %d", k, e, a)
}
// the existing node is detected and the index set // the existing node is detected and the index set
event := <-watching.ResultChan() event := <-watching.ResultChan()
@ -661,8 +648,8 @@ func TestWatchFromNotFound(t *testing.T) {
} }
fakeClient.WaitForWatchCompletion() fakeClient.WaitForWatchCompletion()
if fakeClient.WatchIndex != 2 { if fakeClient.WatchIndex != 3 {
t.Errorf("Expected client to wait for %d, got %#v", 2, fakeClient) t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient)
} }
watching.Stop() watching.Stop()