Merge pull request #7048 from yujuhong/para_cleanup

Kubelet: parallelize cleaning up containers in unwanted pods
This commit is contained in:
Victor Marmol 2015-04-20 14:59:07 -07:00
commit d44e9b4880
2 changed files with 85 additions and 38 deletions

View File

@ -375,7 +375,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
}
func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
result := []*docker.Container{}
var result []*docker.Container
if dm.client == nil {
return nil, fmt.Errorf("unexpected nil docker client.")
}

View File

@ -937,17 +937,27 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
}
// Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(runningPod kubecontainer.Pod) error {
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time.
errs := make(chan error, len(runningPod.Containers))
errs := make(chan error, len(pod.Containers))
wg := sync.WaitGroup{}
for _, container := range runningPod.Containers {
for _, container := range pod.Containers {
wg.Add(1)
go func(container *kubecontainer.Container) {
defer util.HandleCrash()
// Call the networking plugin for teardown.
// TODO: Handle this without signaling the pod infra container to
// adapt to the generic container runtime.
if container.Name == dockertools.PodInfraContainerName {
err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(container.ID))
if err != nil {
glog.Errorf("Failed tearing down the infra container: %v", err)
errs <- err
}
}
err := kl.killContainer(container)
if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
errs <- err
}
wg.Done()
@ -1330,7 +1340,7 @@ func getDesiredVolumes(pods []*api.Pod) map[string]api.Volume {
return desiredVolumes
}
func (kl *Kubelet) cleanupOrphanedPods(pods []*api.Pod) error {
func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error {
desired := util.NewStringSet()
for _, pod := range pods {
desired.Insert(string(pod.UID))
@ -1452,48 +1462,25 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
return nil
}
// Kill any containers we don't need.
killed := []string{}
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; found {
// syncPod() will handle this one.
continue
}
// Kill all the containers in the unidentified pod.
for _, c := range pod.Containers {
// call the networking plugin for teardown
if c.Name == dockertools.PodInfraContainerName {
err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(c.ID))
if err != nil {
glog.Errorf("Network plugin pre-delete method returned an error: %v", err)
}
}
glog.V(1).Infof("Killing unwanted container %+v", c)
err = kl.killContainer(c)
if err != nil {
glog.Errorf("Error killing container %+v: %v", c, err)
} else {
killed = append(killed, string(c.ID))
}
}
}
running, err := kl.containerManager.GetRunningContainers(killed)
// Kill containers associated with unwanted pods and get a list of
// unwanted containers that are still running.
running, err := kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {
glog.Errorf("Failed to poll container state: %v", err)
glog.Errorf("Failed killing unwanted containers: %v", err)
return err
}
// Remove any orphaned volumes.
err = kl.cleanupOrphanedVolumes(pods, running)
if err != nil {
glog.Errorf("Failed cleaning up orphaned volumes: %v", err)
return err
}
// Remove any orphaned pods.
err = kl.cleanupOrphanedPods(pods)
// Remove any orphaned pod directories.
err = kl.cleanupOrphanedPodDirs(pods)
if err != nil {
glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
return err
}
@ -1503,6 +1490,67 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
return err
}
// killUnwantedPods kills the unwanted, running pods in parallel, and returns
// containers in those pods that it failed to terminate.
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
runningPods []*kubecontainer.Pod) ([]*docker.Container, error) {
type result struct {
containers []*docker.Container
err error
}
ch := make(chan result, len(runningPods))
defer close(ch)
numWorkers := 0
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; found {
// Per-pod workers will handle the desired pods.
continue
}
numWorkers++
go func(pod *kubecontainer.Pod, ch chan result) {
defer func() {
// Send the IDs of the containers that we failed to killed.
containers, err := kl.getRunningContainersByPod(pod)
ch <- result{containers: containers, err: err}
}()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers.
err := kl.killPod(*pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
return
}
// Remove the pod directory.
err = os.RemoveAll(kl.getPodDir(pod.ID))
if err != nil {
glog.Errorf("Failed removing pod directory for %q", pod.Name)
return
}
}(pod, ch)
}
// Aggregate results from the pod killing workers.
var errs []error
var running []*docker.Container
for i := 0; i < numWorkers; i++ {
m := <-ch
if m.err != nil {
errs = append(errs, m.err)
continue
}
running = append(running, m.containers...)
}
return running, utilErrors.NewAggregate(errs)
}
func (kl *Kubelet) getRunningContainersByPod(pod *kubecontainer.Pod) ([]*docker.Container, error) {
containerIDs := make([]string, len(pod.Containers))
for i, c := range pod.Containers {
containerIDs[i] = string(c.ID)
}
return kl.containerManager.GetRunningContainers(containerIDs)
}
type podsByCreationTime []*api.Pod
func (s podsByCreationTime) Len() int {
@ -1627,7 +1675,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
unsyncedPod = false
}
}
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)