Merge pull request #569 from smarterclayton/fix_bad_watch_in_etcd

Kubelet etcd watch skipping indices
This commit is contained in:
Daniel Smith 2014-07-22 18:23:00 -07:00
commit 0ef600c8cd
2 changed files with 10 additions and 10 deletions

View File

@ -62,20 +62,20 @@ func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, up
func (s *SourceEtcd) run() { func (s *SourceEtcd) run() {
index := uint64(0) index := uint64(0)
for { for {
lastIndex, err := s.fetchNextState(index) nextIndex, err := s.fetchNextState(index)
if err != nil { if err != nil {
if !tools.IsEtcdNotFound(err) { if !tools.IsEtcdNotFound(err) {
glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err) glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err)
} }
return return
} }
index = lastIndex + 1 index = nextIndex
} }
} }
// fetchNextState fetches the key (or waits for a change to a key) and then returns // fetchNextState fetches the key (or waits for a change to a key) and then returns
// the index read. It will watch no longer than s.waitDuration and then return // the nextIndex to read. It will watch no longer than s.waitDuration and then return
func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err error) { func (s *SourceEtcd) fetchNextState(fromIndex uint64) (nextIndex uint64, err error) {
var response *etcd.Response var response *etcd.Response
if fromIndex == 0 { if fromIndex == 0 {
@ -87,7 +87,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err err
} }
} }
if err != nil { if err != nil {
return 0, err return fromIndex, err
} }
pods, err := responseToPods(response) pods, err := responseToPods(response)
@ -99,7 +99,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err err
glog.Infof("Got state from etcd: %+v", pods) glog.Infof("Got state from etcd: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET} s.updates <- kubelet.PodUpdate{pods, kubelet.SET}
return response.Node.ModifiedIndex, nil return response.Node.ModifiedIndex + 1, nil
} }
// responseToPods takes an etcd Response object, and turns it into a structured list of containers. // responseToPods takes an etcd Response object, and turns it into a structured list of containers.

View File

@ -83,8 +83,8 @@ func TestGetEtcd(t *testing.T) {
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
lastIndex, err := c.fetchNextState(0) lastIndex, err := c.fetchNextState(0)
expectNoError(t, err) expectNoError(t, err)
if lastIndex != 1 { if lastIndex != 2 {
t.Errorf("Expected %#v, Got %#v", 1, lastIndex) t.Errorf("Expected %#v, Got %#v", 2, lastIndex)
} }
update := (<-ch).(kubelet.PodUpdate) update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}) expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}})
@ -108,8 +108,8 @@ func TestWatchEtcd(t *testing.T) {
c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond}
lastIndex, err := c.fetchNextState(1) lastIndex, err := c.fetchNextState(1)
expectNoError(t, err) expectNoError(t, err)
if lastIndex != 2 { if lastIndex != 3 {
t.Errorf("Expected %d, Got %d", 1, lastIndex) t.Errorf("Expected %d, Got %d", 3, lastIndex)
} }
update := (<-ch).(kubelet.PodUpdate) update := (<-ch).(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET) expected := CreatePodUpdate(kubelet.SET)