mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Kubelet: parallelize cleaning up containers in unwanted pods
Kubelet kills unwanted pods in SyncPods, which directly impact the latency of a sync iteration. This change parallelizes the cleanup to lessen the effect. Eventually, we should leverage per-pod workers for cleanup, with the exception of truly orphaned pods.
This commit is contained in:
parent
7c092e1867
commit
275002173e
@ -375,7 +375,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
|
func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
|
||||||
result := []*docker.Container{}
|
var result []*docker.Container
|
||||||
if dm.client == nil {
|
if dm.client == nil {
|
||||||
return nil, fmt.Errorf("unexpected nil docker client.")
|
return nil, fmt.Errorf("unexpected nil docker client.")
|
||||||
}
|
}
|
||||||
|
@ -937,17 +937,27 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Kill all running containers in a pod (includes the pod infra container).
|
// Kill all running containers in a pod (includes the pod infra container).
|
||||||
func (kl *Kubelet) killPod(runningPod kubecontainer.Pod) error {
|
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
|
||||||
// Send the kills in parallel since they may take a long time.
|
// Send the kills in parallel since they may take a long time.
|
||||||
errs := make(chan error, len(runningPod.Containers))
|
errs := make(chan error, len(pod.Containers))
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for _, container := range runningPod.Containers {
|
for _, container := range pod.Containers {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(container *kubecontainer.Container) {
|
go func(container *kubecontainer.Container) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
|
// Call the networking plugin for teardown.
|
||||||
|
// TODO: Handle this without signaling the pod infra container to
|
||||||
|
// adapt to the generic container runtime.
|
||||||
|
if container.Name == dockertools.PodInfraContainerName {
|
||||||
|
err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(container.ID))
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed tearing down the infra container: %v", err)
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
err := kl.killContainer(container)
|
err := kl.killContainer(container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
|
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
|
||||||
errs <- err
|
errs <- err
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
@ -1330,7 +1340,7 @@ func getDesiredVolumes(pods []*api.Pod) map[string]api.Volume {
|
|||||||
return desiredVolumes
|
return desiredVolumes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) cleanupOrphanedPods(pods []*api.Pod) error {
|
func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error {
|
||||||
desired := util.NewStringSet()
|
desired := util.NewStringSet()
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
desired.Insert(string(pod.UID))
|
desired.Insert(string(pod.UID))
|
||||||
@ -1452,48 +1462,25 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kill any containers we don't need.
|
// Kill containers associated with unwanted pods and get a list of
|
||||||
killed := []string{}
|
// unwanted containers that are still running.
|
||||||
for _, pod := range runningPods {
|
running, err := kl.killUnwantedPods(desiredPods, runningPods)
|
||||||
if _, found := desiredPods[pod.ID]; found {
|
|
||||||
// syncPod() will handle this one.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Kill all the containers in the unidentified pod.
|
|
||||||
for _, c := range pod.Containers {
|
|
||||||
// call the networking plugin for teardown
|
|
||||||
if c.Name == dockertools.PodInfraContainerName {
|
|
||||||
err := kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(c.ID))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Network plugin pre-delete method returned an error: %v", err)
|
glog.Errorf("Failed killing unwanted containers: %v", err)
|
||||||
}
|
|
||||||
}
|
|
||||||
glog.V(1).Infof("Killing unwanted container %+v", c)
|
|
||||||
err = kl.killContainer(c)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error killing container %+v: %v", c, err)
|
|
||||||
} else {
|
|
||||||
killed = append(killed, string(c.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
running, err := kl.containerManager.GetRunningContainers(killed)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to poll container state: %v", err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove any orphaned volumes.
|
// Remove any orphaned volumes.
|
||||||
err = kl.cleanupOrphanedVolumes(pods, running)
|
err = kl.cleanupOrphanedVolumes(pods, running)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf("Failed cleaning up orphaned volumes: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove any orphaned pods.
|
// Remove any orphaned pod directories.
|
||||||
err = kl.cleanupOrphanedPods(pods)
|
err = kl.cleanupOrphanedPodDirs(pods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1503,6 +1490,67 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// killUnwantedPods kills the unwanted, running pods in parallel, and returns
|
||||||
|
// containers in those pods that it failed to terminate.
|
||||||
|
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
|
||||||
|
runningPods []*kubecontainer.Pod) ([]*docker.Container, error) {
|
||||||
|
type result struct {
|
||||||
|
containers []*docker.Container
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
ch := make(chan result, len(runningPods))
|
||||||
|
defer close(ch)
|
||||||
|
numWorkers := 0
|
||||||
|
for _, pod := range runningPods {
|
||||||
|
if _, found := desiredPods[pod.ID]; found {
|
||||||
|
// Per-pod workers will handle the desired pods.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
numWorkers++
|
||||||
|
go func(pod *kubecontainer.Pod, ch chan result) {
|
||||||
|
defer func() {
|
||||||
|
// Send the IDs of the containers that we failed to killed.
|
||||||
|
containers, err := kl.getRunningContainersByPod(pod)
|
||||||
|
ch <- result{containers: containers, err: err}
|
||||||
|
}()
|
||||||
|
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
|
||||||
|
// Stop the containers.
|
||||||
|
err := kl.killPod(*pod)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Remove the pod directory.
|
||||||
|
err = os.RemoveAll(kl.getPodDir(pod.ID))
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed removing pod directory for %q", pod.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(pod, ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate results from the pod killing workers.
|
||||||
|
var errs []error
|
||||||
|
var running []*docker.Container
|
||||||
|
for i := 0; i < numWorkers; i++ {
|
||||||
|
m := <-ch
|
||||||
|
if m.err != nil {
|
||||||
|
errs = append(errs, m.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
running = append(running, m.containers...)
|
||||||
|
}
|
||||||
|
return running, utilErrors.NewAggregate(errs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) getRunningContainersByPod(pod *kubecontainer.Pod) ([]*docker.Container, error) {
|
||||||
|
containerIDs := make([]string, len(pod.Containers))
|
||||||
|
for i, c := range pod.Containers {
|
||||||
|
containerIDs[i] = string(c.ID)
|
||||||
|
}
|
||||||
|
return kl.containerManager.GetRunningContainers(containerIDs)
|
||||||
|
}
|
||||||
|
|
||||||
type podsByCreationTime []*api.Pod
|
type podsByCreationTime []*api.Pod
|
||||||
|
|
||||||
func (s podsByCreationTime) Len() int {
|
func (s podsByCreationTime) Len() int {
|
||||||
@ -1627,7 +1675,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
|||||||
unsyncedPod = false
|
unsyncedPod = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
|
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
|
||||||
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
|
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
|
||||||
glog.Errorf("Couldn't sync containers: %v", err)
|
glog.Errorf("Couldn't sync containers: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user