Kubelet etcd watch skipping indices

The next index to watch should always be current+1 if we got a
response from etcd, but otherwise it should always be current.
Moved the increment into fetchNextState which returns the next
index to watch for (or the same if an error occurs)
This commit is contained in:
Clayton Coleman 2014-07-22 20:21:41 -04:00
parent e7f4460ab8
commit bc0ac1e81b
2 changed files with 10 additions and 10 deletions

View File

@ -62,20 +62,20 @@ func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, up
func (s *SourceEtcd) run() {
index := uint64(0)
for {
lastIndex, err := s.fetchNextState(index)
nextIndex, err := s.fetchNextState(index)
if err != nil {
if !tools.IsEtcdNotFound(err) {
glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err)
}
return
}
index = lastIndex + 1
index = nextIndex
}
}
// fetchNextState fetches the key (or waits for a change to a key) and then returns
// the index read. It will watch no longer than s.waitDuration and then return
func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err error) {
// the nextIndex to read. It will watch no longer than s.waitDuration and then return
func (s *SourceEtcd) fetchNextState(fromIndex uint64) (nextIndex uint64, err error) {
var response *etcd.Response
if fromIndex == 0 {
@ -87,7 +87,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err err
}
}
if err != nil {
return 0, err
return fromIndex, err
}
pods, err := responseToPods(response)
@ -99,7 +99,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err err
glog.Infof("Got state from etcd: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
return response.Node.ModifiedIndex, nil
return response.Node.ModifiedIndex + 1, nil
}
// responseToPods takes an etcd Response object, and turns it into a structured list of containers.

View File

@ -83,8 +83,8 @@ func TestGetEtcd(t *testing.T) {
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
lastIndex, err := c.fetchNextState(0)
expectNoError(t, err)
if lastIndex != 1 {
t.Errorf("Expected %#v, Got %#v", 1, lastIndex)
if lastIndex != 2 {
t.Errorf("Expected %#v, Got %#v", 2, lastIndex)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}})
@ -108,8 +108,8 @@ func TestWatchEtcd(t *testing.T) {
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
lastIndex, err := c.fetchNextState(1)
expectNoError(t, err)
if lastIndex != 2 {
t.Errorf("Expected %d, Got %d", 1, lastIndex)
if lastIndex != 3 {
t.Errorf("Expected %d, Got %d", 3, lastIndex)
}
update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET)