runtime-rs: Fix volumes and rootfs cleanup issues

There are several processes for container exit:

- Non-detach mode: `Wait` request is sent by containerd, then
  `wait_process()` will be called eventually.
- Detach mode: `Wait` request is not sent, the `wait_process()` won’t be
  called.
    - Killed by ctr: For example, a container runs `tail -f /dev/null`, and
      is killed by `sudo ctr t kill -a -s SIGTERM <CID>`. Kill request is
      sent, then `kill_process()` will be called. User executes `sudo ctr c
      rm <CID>`, `Delete` request is sent, then `delete_process()` will be
      called.
    - Exited on its own: For example, a container runs `sleep 1s`. The
      container’s state goes to `Stopped` after 1 second. User executes
      the delete command as below.

Where do we do container cleanup things?

- `wait_process()`: No, because it won’t be called in detach mode.
- `delete_process()`: No, because it depends on when the user executes the
  delete command.
- `run_io_wait()`: Yes. A container is considered exited once its IO ended.
  And this always be called once a container is launched.

Fixes: #7713

Signed-off-by: Jianyong Wu <jianyong.wu@arm.com>
Signed-off-by: Xuewei Niu <niuxuewei.nxw@antgroup.com>
This commit is contained in:
Xuewei Niu 2023-08-22 10:26:34 +08:00
parent 8032797418
commit 268e846558
6 changed files with 77 additions and 51 deletions

View File

@ -184,7 +184,6 @@ pub enum ProcessStatus {
Stopped = 3, Stopped = 3,
Paused = 4, Paused = 4,
Pausing = 5, Pausing = 5,
Exited = 6,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@ -56,7 +56,6 @@ impl From<ProcessStatus> for api::Status {
ProcessStatus::Stopped => api::Status::STOPPED, ProcessStatus::Stopped => api::Status::STOPPED,
ProcessStatus::Paused => api::Status::PAUSED, ProcessStatus::Paused => api::Status::PAUSED,
ProcessStatus::Pausing => api::Status::PAUSING, ProcessStatus::Pausing => api::Status::PAUSING,
ProcessStatus::Exited => api::Status::STOPPED,
} }
} }
} }

View File

@ -186,7 +186,11 @@ impl Container {
Ok(()) Ok(())
} }
pub async fn start(&self, process: &ContainerProcess) -> Result<()> { pub async fn start(
&self,
containers: Arc<RwLock<HashMap<String, Container>>>,
process: &ContainerProcess,
) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
match process.process_type { match process.process_type {
ProcessType::Container => { ProcessType::Container => {
@ -199,7 +203,7 @@ impl Container {
let container_io = inner.new_container_io(process).await?; let container_io = inner.new_container_io(process).await?;
inner inner
.init_process .init_process
.start_io_and_wait(self.agent.clone(), container_io) .start_io_and_wait(containers, self.agent.clone(), container_io)
.await?; .await?;
} }
ProcessType::Exec => { ProcessType::Exec => {
@ -232,7 +236,7 @@ impl Container {
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?; .ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process exec.process
.start_io_and_wait(self.agent.clone(), container_io) .start_io_and_wait(containers, self.agent.clone(), container_io)
.await .await
.context("start io and wait")?; .context("start io and wait")?;
} }
@ -287,10 +291,7 @@ impl Container {
all: bool, all: bool,
) -> Result<()> { ) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
let device_manager = self.resource_manager.get_device_manager().await; inner.signal_process(container_process, signal, all).await
inner
.signal_process(container_process, signal, all, &device_manager)
.await
} }
pub async fn exec_process( pub async fn exec_process(

View File

@ -166,7 +166,12 @@ impl ContainerInner {
} }
} }
async fn cleanup_container(&mut self, cid: &str, force: bool) -> Result<()> { async fn cleanup_container(
&mut self,
cid: &str,
force: bool,
device_manager: &RwLock<DeviceManager>,
) -> Result<()> {
// wait until the container process // wait until the container process
// terminated and the status write lock released. // terminated and the status write lock released.
info!(self.logger, "wait on container terminated"); info!(self.logger, "wait on container terminated");
@ -195,6 +200,14 @@ impl ContainerInner {
// close the exit channel to wakeup wait service // close the exit channel to wakeup wait service
// send to notify watchers who are waiting for the process exit // send to notify watchers who are waiting for the process exit
self.init_process.stop().await; self.init_process.stop().await;
self.clean_volumes(device_manager)
.await
.context("clean volumes")?;
self.clean_rootfs(device_manager)
.await
.context("clean rootfs")?;
Ok(()) Ok(())
} }
@ -213,26 +226,24 @@ impl ContainerInner {
return Ok(()); return Ok(());
} }
self.check_state(vec![ProcessStatus::Running, ProcessStatus::Exited]) self.check_state(vec![ProcessStatus::Running])
.await .await
.context("check state")?; .context("check state")?;
if state == ProcessStatus::Running { // if use force mode to stop container, stop always successful
// if use force mode to stop container, stop always successful // send kill signal to container
// send kill signal to container // ignore the error of sending signal, since the process would
// ignore the error of sending signal, since the process would // have been killed and exited yet.
// have been killed and exited yet. self.signal_process(process, Signal::SIGKILL as u32, false)
self.signal_process(process, Signal::SIGKILL as u32, false, device_manager) .await
.await .map_err(|e| {
.map_err(|e| { warn!(logger, "failed to signal kill. {:?}", e);
warn!(logger, "failed to signal kill. {:?}", e); })
}) .ok();
.ok();
}
match process.process_type { match process.process_type {
ProcessType::Container => self ProcessType::Container => self
.cleanup_container(&process.container_id.container_id, force) .cleanup_container(&process.container_id.container_id, force, device_manager)
.await .await
.context("stop container")?, .context("stop container")?,
ProcessType::Exec => { ProcessType::Exec => {
@ -252,7 +263,6 @@ impl ContainerInner {
process: &ContainerProcess, process: &ContainerProcess,
signal: u32, signal: u32,
all: bool, all: bool,
device_manager: &RwLock<DeviceManager>,
) -> Result<()> { ) -> Result<()> {
let mut process_id: agent::ContainerProcessID = process.clone().into(); let mut process_id: agent::ContainerProcessID = process.clone().into();
if all { if all {
@ -264,13 +274,6 @@ impl ContainerInner {
.signal_process(agent::SignalProcessRequest { process_id, signal }) .signal_process(agent::SignalProcessRequest { process_id, signal })
.await?; .await?;
self.clean_volumes(device_manager)
.await
.context("clean volumes")?;
self.clean_rootfs(device_manager)
.await
.context("clean rootfs")?;
Ok(()) Ok(())
} }

View File

@ -241,13 +241,6 @@ impl ContainerManager for VirtContainerManager {
info!(logger, "wait process exit status {:?}", status); info!(logger, "wait process exit status {:?}", status);
// stop process
let containers = self.containers.read().await;
let container_id = &process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.stop_process(process).await.context("stop container")?;
Ok(status.clone()) Ok(status.clone())
} }
@ -258,7 +251,9 @@ impl ContainerManager for VirtContainerManager {
let c = containers let c = containers
.get(container_id) .get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.start(process).await.context("start")?; c.start(self.containers.clone(), process)
.await
.context("start")?;
// Poststart Hooks: // Poststart Hooks:
// * should be run in runtime namespace // * should be run in runtime namespace

View File

@ -4,21 +4,19 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use agent::Agent; use agent::Agent;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use awaitgroup::{WaitGroup, Worker as WaitGroupWorker}; use awaitgroup::{WaitGroup, Worker as WaitGroupWorker};
use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID}; use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID};
use tokio::{ use tokio::io::{AsyncRead, AsyncWrite};
io::{AsyncRead, AsyncWrite}, use tokio::sync::{watch, RwLock};
sync::{watch, RwLock},
};
use super::{ use super::container::Container;
io::{ContainerIo, ShimIo}, use super::io::{ContainerIo, ShimIo};
logger_with_process, use super::logger_with_process;
};
pub type ProcessWatcher = ( pub type ProcessWatcher = (
Option<watch::Receiver<bool>>, Option<watch::Receiver<bool>>,
@ -83,6 +81,7 @@ impl Process {
pub async fn start_io_and_wait( pub async fn start_io_and_wait(
&mut self, &mut self,
containers: Arc<RwLock<HashMap<String, Container>>>,
agent: Arc<dyn Agent>, agent: Arc<dyn Agent>,
container_io: ContainerIo, container_io: ContainerIo,
) -> Result<()> { ) -> Result<()> {
@ -118,7 +117,9 @@ impl Process {
} }
} }
self.run_io_wait(agent, wg).await.context("run io thread")?; self.run_io_wait(containers, agent, wg)
.await
.context("run io thread")?;
Ok(()) Ok(())
} }
@ -148,7 +149,15 @@ impl Process {
Ok(()) Ok(())
} }
async fn run_io_wait(&mut self, agent: Arc<dyn Agent>, mut wg: WaitGroup) -> Result<()> { /// A container is considered exited once its IO ended.
/// This function waits for IO to end. And then, do some cleanup
/// things.
async fn run_io_wait(
&mut self,
containers: Arc<RwLock<HashMap<String, Container>>>,
agent: Arc<dyn Agent>,
mut wg: WaitGroup,
) -> Result<()> {
let logger = self.logger.clone(); let logger = self.logger.clone();
info!(logger, "start run io wait"); info!(logger, "start run io wait");
let process = self.process.clone(); let process = self.process.clone();
@ -177,12 +186,32 @@ impl Process {
info!(logger, "end wait process exit code {}", resp.status); info!(logger, "end wait process exit code {}", resp.status);
let containers = containers.read().await;
let container_id = &process.container_id.container_id;
let c = match containers.get(container_id) {
Some(c) => c,
None => {
error!(
logger,
"Failed to stop process, since container {} not found", container_id
);
return;
}
};
if let Err(err) = c.stop_process(&process).await {
error!(
logger,
"Failed to stop process, process = {:?}, err = {:?}", process, err
);
}
let mut exit_status = exit_status.write().await; let mut exit_status = exit_status.write().await;
exit_status.update_exit_code(resp.status); exit_status.update_exit_code(resp.status);
drop(exit_status); drop(exit_status);
let mut status = status.write().await; let mut status = status.write().await;
*status = ProcessStatus::Exited; *status = ProcessStatus::Stopped;
drop(status); drop(status);
drop(exit_notifier); drop(exit_notifier);