diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 4bfc3423561..19c0533f2fd 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -59,7 +59,16 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface } func (s *sourceEtcd) run() { - watching := s.helper.Watch(s.key, 0) + boundPods := api.BoundPods{} + err := s.helper.ExtractToList(s.key, &boundPods) + if err != nil { + glog.Errorf("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err) + return + } + // Push update. Maybe an empty PodList to allow EtcdSource to be marked as seen + s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.EtcdSource} + index, _ := s.helper.ResourceVersioner.ResourceVersion(&boundPods) + watching := s.helper.Watch(s.key, index) for { select { case event, ok := <-watching.ResultChan(): @@ -87,6 +96,9 @@ func (s *sourceEtcd) run() { // It returns a list of containers, or an error if one occurs. func eventToPods(ev watch.Event) ([]api.BoundPod, error) { pods := []api.BoundPod{} + if ev.Object == nil { + return pods, nil + } boundPods, ok := ev.Object.(*api.BoundPods) if !ok { return pods, errors.New("unable to parse response as BoundPods") diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index 047e71dbfe3..80f25da514e 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -33,7 +33,7 @@ func TestEventToPods(t *testing.T) { { input: watch.Event{Object: nil}, pods: []api.BoundPod{}, - fail: true, + fail: false, }, { input: watch.Event{Object: &api.BoundPods{}}, diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 84667dad592..9af7ccdd8fa 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -64,6 +64,8 @@ func (s *sourceFile) extractFromPath() error { if !os.IsNotExist(err) { return err } + // Emit an update with an empty PodList to allow FileSource to be marked as seen + s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.FileSource} return fmt.Errorf("path does not exist, ignoring") } diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index 3c9e2a3e4ee..163163621bc 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -92,8 +92,14 @@ func TestUpdateOnNonExistentFile(t *testing.T) { NewSourceFile("random_non_existent_path", time.Millisecond, ch) select { case got := <-ch: - t.Errorf("Expected no update, Got %#v", got) + update := got.(kubelet.PodUpdate) + expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource) + if !api.Semantic.DeepEqual(expected, update) { + t.Fatalf("Expected %#v, Got %#v", expected, update) + } + case <-time.After(2 * time.Millisecond): + t.Errorf("Expected update, timeout instead") } } diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index 0670f33a8da..20185999f7e 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -69,6 +69,8 @@ func (s *sourceURL) extractFromURL() error { return fmt.Errorf("%v: %v", s.url, resp.Status) } if len(data) == 0 { + // Emit an update with an empty PodList to allow HTTPSource to be marked as seen + s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.HTTPSource} return fmt.Errorf("zero-length data received from %v", s.url) } // Short circuit if the manifest has not changed since the last time it was read. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cfc7813ac82..a20e6f16001 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1118,9 +1118,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } case <-time.After(kl.resyncInterval): glog.V(4).Infof("Periodic sync") - if kl.pods == nil { - continue - } } err := handler.SyncPods(kl.pods)