From 275002173e2d7c53b86965016fb0f180f92f8ec9 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 14 Apr 2015 15:26:50 -0700 Subject: [PATCH] Kubelet: parallelize cleaning up containers in unwanted pods Kubelet kills unwanted pods in SyncPods, which directly impact the latency of a sync iteration. This change parallelizes the cleanup to lessen the effect. Eventually, we should leverage per-pod workers for cleanup, with the exception of truly orphaned pods. --- pkg/kubelet/dockertools/manager.go | 2 +- pkg/kubelet/kubelet.go | 121 ++++++++++++++++++++--------- 2 files changed, 85 insertions(+), 38 deletions(-) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 8585cf1ae6f..4e8b15e1a28 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -375,7 +375,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) { } func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) { - result := []*docker.Container{} + var result []*docker.Container if dm.client == nil { return nil, fmt.Errorf("unexpected nil docker client.") } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cf7773c176a..f863d792170 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -937,17 +937,27 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { } // Kill all running containers in a pod (includes the pod infra container). -func (kl *Kubelet) killPod(runningPod kubecontainer.Pod) error { +func (kl *Kubelet) killPod(pod kubecontainer.Pod) error { // Send the kills in parallel since they may take a long time. - errs := make(chan error, len(runningPod.Containers)) + errs := make(chan error, len(pod.Containers)) wg := sync.WaitGroup{} - for _, container := range runningPod.Containers { + for _, container := range pod.Containers { wg.Add(1) go func(container *kubecontainer.Container) { defer util.HandleCrash() + // Call the networking plugin for teardown. + // TODO: Handle this without signaling the pod infra container to + // adapt to the generic container runtime. + if container.Name == dockertools.PodInfraContainerName { + err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(container.ID)) + if err != nil { + glog.Errorf("Failed tearing down the infra container: %v", err) + errs <- err + } + } err := kl.killContainer(container) if err != nil { - glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID) + glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) errs <- err } wg.Done() @@ -1330,7 +1340,7 @@ func getDesiredVolumes(pods []*api.Pod) map[string]api.Volume { return desiredVolumes } -func (kl *Kubelet) cleanupOrphanedPods(pods []*api.Pod) error { +func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error { desired := util.NewStringSet() for _, pod := range pods { desired.Insert(string(pod.UID)) @@ -1452,48 +1462,25 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri return nil } - // Kill any containers we don't need. - killed := []string{} - for _, pod := range runningPods { - if _, found := desiredPods[pod.ID]; found { - // syncPod() will handle this one. - continue - } - - // Kill all the containers in the unidentified pod. - for _, c := range pod.Containers { - // call the networking plugin for teardown - if c.Name == dockertools.PodInfraContainerName { - err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(c.ID)) - if err != nil { - glog.Errorf("Network plugin pre-delete method returned an error: %v", err) - } - } - glog.V(1).Infof("Killing unwanted container %+v", c) - err = kl.killContainer(c) - if err != nil { - glog.Errorf("Error killing container %+v: %v", c, err) - } else { - killed = append(killed, string(c.ID)) - } - } - } - - running, err := kl.containerManager.GetRunningContainers(killed) + // Kill containers associated with unwanted pods and get a list of + // unwanted containers that are still running. + running, err := kl.killUnwantedPods(desiredPods, runningPods) if err != nil { - glog.Errorf("Failed to poll container state: %v", err) + glog.Errorf("Failed killing unwanted containers: %v", err) return err } // Remove any orphaned volumes. err = kl.cleanupOrphanedVolumes(pods, running) if err != nil { + glog.Errorf("Failed cleaning up orphaned volumes: %v", err) return err } - // Remove any orphaned pods. - err = kl.cleanupOrphanedPods(pods) + // Remove any orphaned pod directories. + err = kl.cleanupOrphanedPodDirs(pods) if err != nil { + glog.Errorf("Failed cleaning up orphaned pod directories: %v", err) return err } @@ -1503,6 +1490,67 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri return err } +// killUnwantedPods kills the unwanted, running pods in parallel, and returns +// containers in those pods that it failed to terminate. +func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, + runningPods []*kubecontainer.Pod) ([]*docker.Container, error) { + type result struct { + containers []*docker.Container + err error + } + ch := make(chan result, len(runningPods)) + defer close(ch) + numWorkers := 0 + for _, pod := range runningPods { + if _, found := desiredPods[pod.ID]; found { + // Per-pod workers will handle the desired pods. + continue + } + numWorkers++ + go func(pod *kubecontainer.Pod, ch chan result) { + defer func() { + // Send the IDs of the containers that we failed to killed. + containers, err := kl.getRunningContainersByPod(pod) + ch <- result{containers: containers, err: err} + }() + glog.V(1).Infof("Killing unwanted pod %q", pod.Name) + // Stop the containers. + err := kl.killPod(*pod) + if err != nil { + glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) + return + } + // Remove the pod directory. + err = os.RemoveAll(kl.getPodDir(pod.ID)) + if err != nil { + glog.Errorf("Failed removing pod directory for %q", pod.Name) + return + } + }(pod, ch) + } + + // Aggregate results from the pod killing workers. + var errs []error + var running []*docker.Container + for i := 0; i < numWorkers; i++ { + m := <-ch + if m.err != nil { + errs = append(errs, m.err) + continue + } + running = append(running, m.containers...) + } + return running, utilErrors.NewAggregate(errs) +} + +func (kl *Kubelet) getRunningContainersByPod(pod *kubecontainer.Pod) ([]*docker.Container, error) { + containerIDs := make([]string, len(pod.Containers)) + for i, c := range pod.Containers { + containerIDs[i] = string(c.ID) + } + return kl.containerManager.GetRunningContainers(containerIDs) +} + type podsByCreationTime []*api.Pod func (s podsByCreationTime) Len() int { @@ -1627,7 +1675,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { unsyncedPod = false } } - pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap() if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { glog.Errorf("Couldn't sync containers: %v", err)