Fix "Kubelet doesn't kill old pods when BoundPods is empty" issue

This commit is contained in:
saadali 2015-01-08 23:01:07 -08:00
parent 969c4b8c49
commit e1917cf900
6 changed files with 25 additions and 6 deletions

View File

@ -59,7 +59,16 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
}
func (s *sourceEtcd) run() {
watching := s.helper.Watch(s.key, 0)
boundPods := api.BoundPods{}
err := s.helper.ExtractToList(s.key, &boundPods)
if err != nil {
glog.Errorf("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err)
return
}
// Push update. Maybe an empty PodList to allow EtcdSource to be marked as seen
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.EtcdSource}
index, _ := s.helper.ResourceVersioner.ResourceVersion(&boundPods)
watching := s.helper.Watch(s.key, index)
for {
select {
case event, ok := <-watching.ResultChan():
@ -87,6 +96,9 @@ func (s *sourceEtcd) run() {
// It returns a list of containers, or an error if one occurs.
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
pods := []api.BoundPod{}
if ev.Object == nil {
return pods, nil
}
boundPods, ok := ev.Object.(*api.BoundPods)
if !ok {
return pods, errors.New("unable to parse response as BoundPods")

View File

@ -33,7 +33,7 @@ func TestEventToPods(t *testing.T) {
{
input: watch.Event{Object: nil},
pods: []api.BoundPod{},
fail: true,
fail: false,
},
{
input: watch.Event{Object: &api.BoundPods{}},

View File

@ -64,6 +64,8 @@ func (s *sourceFile) extractFromPath() error {
if !os.IsNotExist(err) {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.FileSource}
return fmt.Errorf("path does not exist, ignoring")
}

View File

@ -92,8 +92,14 @@ func TestUpdateOnNonExistentFile(t *testing.T) {
NewSourceFile("random_non_existent_path", time.Millisecond, ch)
select {
case got := <-ch:
t.Errorf("Expected no update, Got %#v", got)
update := got.(kubelet.PodUpdate)
expected := CreatePodUpdate(kubelet.SET, kubelet.FileSource)
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(2 * time.Millisecond):
t.Errorf("Expected update, timeout instead")
}
}

View File

@ -69,6 +69,8 @@ func (s *sourceURL) extractFromURL() error {
return fmt.Errorf("%v: %v", s.url, resp.Status)
}
if len(data) == 0 {
// Emit an update with an empty PodList to allow HTTPSource to be marked as seen
s.updates <- kubelet.PodUpdate{[]api.BoundPod{}, kubelet.SET, kubelet.HTTPSource}
return fmt.Errorf("zero-length data received from %v", s.url)
}
// Short circuit if the manifest has not changed since the last time it was read.

View File

@ -1118,9 +1118,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
if kl.pods == nil {
continue
}
}
err := handler.SyncPods(kl.pods)