From bc0ac1e81b5dd0beb95f63a22893b3466e9ab50f Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 22 Jul 2014 20:21:41 -0400 Subject: [PATCH] Kubelet etcd watch skipping indices The next index to watch should always be current+1 if we got a response from etcd, but otherwise it should always be current. Moved the increment into fetchNextState which returns the next index to watch for (or the same if an error occurs) --- pkg/kubelet/config/etcd.go | 12 ++++++------ pkg/kubelet/config/etcd_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 48dc7b411b4..3ab7a0cd084 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -62,20 +62,20 @@ func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, up func (s *SourceEtcd) run() { index := uint64(0) for { - lastIndex, err := s.fetchNextState(index) + nextIndex, err := s.fetchNextState(index) if err != nil { if !tools.IsEtcdNotFound(err) { glog.Errorf("Unable to extract from the response (%s): %%v", s.key, err) } return } - index = lastIndex + 1 + index = nextIndex } } // fetchNextState fetches the key (or waits for a change to a key) and then returns -// the index read. It will watch no longer than s.waitDuration and then return -func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err error) { +// the nextIndex to read. It will watch no longer than s.waitDuration and then return +func (s *SourceEtcd) fetchNextState(fromIndex uint64) (nextIndex uint64, err error) { var response *etcd.Response if fromIndex == 0 { @@ -87,7 +87,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err err } } if err != nil { - return 0, err + return fromIndex, err } pods, err := responseToPods(response) @@ -99,7 +99,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (lastIndex uint64, err err glog.Infof("Got state from etcd: %+v", pods) s.updates <- kubelet.PodUpdate{pods, kubelet.SET} - return response.Node.ModifiedIndex, nil + return response.Node.ModifiedIndex + 1, nil } // responseToPods takes an etcd Response object, and turns it into a structured list of containers. diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index 462ac77ea2d..e71ff32f0a8 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -83,8 +83,8 @@ func TestGetEtcd(t *testing.T) { c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} lastIndex, err := c.fetchNextState(0) expectNoError(t, err) - if lastIndex != 1 { - t.Errorf("Expected %#v, Got %#v", 1, lastIndex) + if lastIndex != 2 { + t.Errorf("Expected %#v, Got %#v", 2, lastIndex) } update := (<-ch).(kubelet.PodUpdate) expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}) @@ -108,8 +108,8 @@ func TestWatchEtcd(t *testing.T) { c := SourceEtcd{"/registry/hosts/machine/kubelet", fakeClient, ch, time.Millisecond} lastIndex, err := c.fetchNextState(1) expectNoError(t, err) - if lastIndex != 2 { - t.Errorf("Expected %d, Got %d", 1, lastIndex) + if lastIndex != 3 { + t.Errorf("Expected %d, Got %d", 3, lastIndex) } update := (<-ch).(kubelet.PodUpdate) expected := CreatePodUpdate(kubelet.SET)