From 780accb3ba9bbefd60b19efa4959ec4e165acdbb Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 28 Jul 2015 16:06:05 -0400 Subject: [PATCH] Kubelet should garbage collect dead pods The sync loop should check for terminated pods that are no longer running and clear them. The status loop should never write status if the pod UID changes. Mirror pods should be deleted immediately rather than gracefully. --- pkg/kubelet/dockertools/manager.go | 4 ++++ pkg/kubelet/kubelet.go | 30 ++++++++++++++++++++++++++++++ pkg/kubelet/mirror_client.go | 2 +- pkg/kubelet/status_manager.go | 27 +++++++++++++++++++++++++++ pkg/kubelet/status_manager_test.go | 29 +++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index e019495caa4..8c55d779b3c 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1261,6 +1261,10 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con gracePeriod = minimumGracePeriodInSeconds } err := dm.client.StopContainer(ID, uint(gracePeriod)) + if _, ok := err.(*docker.ContainerNotRunning); ok && err != nil { + glog.V(4).Infof("Container %q has already exited", name) + return nil + } if err == nil { glog.V(2).Infof("Container %q exited after %s", name, util.Now().Sub(start.Time)) } else { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 41850d039c7..a34d756b3a4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1367,6 +1367,32 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } +// Delete any pods that are no longer running and are marked for deletion. +func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { + var terminating []*api.Pod + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + found := false + for _, runningPod := range runningPods { + if runningPod.ID == pod.UID { + found = true + break + } + } + if found { + podFullName := kubecontainer.GetPodFullName(pod) + glog.V(5).Infof("Keeping terminated pod %q and uid %q, still running", podFullName, pod.UID) + continue + } + terminating = append(terminating, pod) + } + } + if !kl.statusManager.TerminatePods(terminating) { + return errors.New("not all pods were successfully terminated") + } + return nil +} + // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { @@ -1529,6 +1555,10 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro // Remove any orphaned mirror pods. kl.podManager.DeleteOrphanedMirrorPods() + if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil { + glog.Errorf("Failed to cleanup terminated pods: %v", err) + } + return err } diff --git a/pkg/kubelet/mirror_client.go b/pkg/kubelet/mirror_client.go index ae54c33017c..8ef630d1b7e 100644 --- a/pkg/kubelet/mirror_client.go +++ b/pkg/kubelet/mirror_client.go @@ -64,7 +64,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error { return err } glog.V(4).Infof("Deleting a mirror pod %q", podFullName) - if err := mc.apiserverClient.Pods(namespace).Delete(name, nil); err != nil { + if err := mc.apiserverClient.Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil { glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) } return nil diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 4d754286309..448c5bd5853 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -131,6 +131,29 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } } +// TerminatePods resets the container status for the provided pods to terminated and triggers +// a status update. This function may not enqueue all the provided pods, in which case it will +// return false +func (s *statusManager) TerminatePods(pods []*api.Pod) bool { + sent := true + s.podStatusesLock.Lock() + defer s.podStatusesLock.Unlock() + for _, pod := range pods { + for i := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[i].State = api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, + } + } + select { + case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: + default: + sent = false + glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod)) + } + } + return sent +} + func (s *statusManager) DeletePodStatus(podFullName string) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() @@ -167,6 +190,10 @@ func (s *statusManager) syncBatch() error { return nil } if err == nil { + if len(pod.UID) > 0 && statusPod.UID != pod.UID { + glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletUtil.FormatPodName(pod)) + return nil + } statusPod.Status = status // TODO: handle conflict as a retry, make that easier too. statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index 585d2ffd34f..22e93275024 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -153,8 +153,21 @@ func TestUnchangedStatus(t *testing.T) { verifyUpdates(t, syncer, 1) } +func TestSyncBatchIgnoresNotFound(t *testing.T) { + syncer := newTestStatusManager() + syncer.SetPodStatus(testPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } + verifyActions(t, syncer.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + }) +} + func TestSyncBatch(t *testing.T) { syncer := newTestStatusManager() + syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() if err != nil { @@ -167,6 +180,22 @@ func TestSyncBatch(t *testing.T) { ) } +func TestSyncBatchChecksMismatchedUID(t *testing.T) { + syncer := newTestStatusManager() + testPod.UID = "first" + differentPod := *testPod + differentPod.UID = "second" + syncer.kubeClient = testclient.NewSimpleFake(testPod) + syncer.SetPodStatus(&differentPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } + verifyActions(t, syncer.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + }) +} + // shuffle returns a new shuffled list of container statuses. func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { numStatuses := len(statuses)