mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 20:24:31 +00:00
runtime-rs: Fix volumes and rootfs cleanup issues
There are several processes for container exit: - Non-detach mode: `Wait` request is sent by containerd, then `wait_process()` will be called eventually. - Detach mode: `Wait` request is not sent, the `wait_process()` won’t be called. - Killed by ctr: For example, a container runs `tail -f /dev/null`, and is killed by `sudo ctr t kill -a -s SIGTERM <CID>`. Kill request is sent, then `kill_process()` will be called. User executes `sudo ctr c rm <CID>`, `Delete` request is sent, then `delete_process()` will be called. - Exited on its own: For example, a container runs `sleep 1s`. The container’s state goes to `Stopped` after 1 second. User executes the delete command as below. Where do we do container cleanup things? - `wait_process()`: No, because it won’t be called in detach mode. - `delete_process()`: No, because it depends on when the user executes the delete command. - `run_io_wait()`: Yes. A container is considered exited once its IO ended. And this always be called once a container is launched. Fixes: #7713 Signed-off-by: Jianyong Wu <jianyong.wu@arm.com> Signed-off-by: Xuewei Niu <niuxuewei.nxw@antgroup.com>
This commit is contained in:
parent
8032797418
commit
268e846558
src/runtime-rs/crates/runtimes
common/src/types
virt_container/src/container_manager
@ -184,7 +184,6 @@ pub enum ProcessStatus {
|
||||
Stopped = 3,
|
||||
Paused = 4,
|
||||
Pausing = 5,
|
||||
Exited = 6,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -56,7 +56,6 @@ impl From<ProcessStatus> for api::Status {
|
||||
ProcessStatus::Stopped => api::Status::STOPPED,
|
||||
ProcessStatus::Paused => api::Status::PAUSED,
|
||||
ProcessStatus::Pausing => api::Status::PAUSING,
|
||||
ProcessStatus::Exited => api::Status::STOPPED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -186,7 +186,11 @@ impl Container {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(&self, process: &ContainerProcess) -> Result<()> {
|
||||
pub async fn start(
|
||||
&self,
|
||||
containers: Arc<RwLock<HashMap<String, Container>>>,
|
||||
process: &ContainerProcess,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
match process.process_type {
|
||||
ProcessType::Container => {
|
||||
@ -199,7 +203,7 @@ impl Container {
|
||||
let container_io = inner.new_container_io(process).await?;
|
||||
inner
|
||||
.init_process
|
||||
.start_io_and_wait(self.agent.clone(), container_io)
|
||||
.start_io_and_wait(containers, self.agent.clone(), container_io)
|
||||
.await?;
|
||||
}
|
||||
ProcessType::Exec => {
|
||||
@ -232,7 +236,7 @@ impl Container {
|
||||
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
|
||||
|
||||
exec.process
|
||||
.start_io_and_wait(self.agent.clone(), container_io)
|
||||
.start_io_and_wait(containers, self.agent.clone(), container_io)
|
||||
.await
|
||||
.context("start io and wait")?;
|
||||
}
|
||||
@ -287,10 +291,7 @@ impl Container {
|
||||
all: bool,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
let device_manager = self.resource_manager.get_device_manager().await;
|
||||
inner
|
||||
.signal_process(container_process, signal, all, &device_manager)
|
||||
.await
|
||||
inner.signal_process(container_process, signal, all).await
|
||||
}
|
||||
|
||||
pub async fn exec_process(
|
||||
|
@ -166,7 +166,12 @@ impl ContainerInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn cleanup_container(&mut self, cid: &str, force: bool) -> Result<()> {
|
||||
async fn cleanup_container(
|
||||
&mut self,
|
||||
cid: &str,
|
||||
force: bool,
|
||||
device_manager: &RwLock<DeviceManager>,
|
||||
) -> Result<()> {
|
||||
// wait until the container process
|
||||
// terminated and the status write lock released.
|
||||
info!(self.logger, "wait on container terminated");
|
||||
@ -195,6 +200,14 @@ impl ContainerInner {
|
||||
// close the exit channel to wakeup wait service
|
||||
// send to notify watchers who are waiting for the process exit
|
||||
self.init_process.stop().await;
|
||||
|
||||
self.clean_volumes(device_manager)
|
||||
.await
|
||||
.context("clean volumes")?;
|
||||
self.clean_rootfs(device_manager)
|
||||
.await
|
||||
.context("clean rootfs")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -213,26 +226,24 @@ impl ContainerInner {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.check_state(vec![ProcessStatus::Running, ProcessStatus::Exited])
|
||||
self.check_state(vec![ProcessStatus::Running])
|
||||
.await
|
||||
.context("check state")?;
|
||||
|
||||
if state == ProcessStatus::Running {
|
||||
// if use force mode to stop container, stop always successful
|
||||
// send kill signal to container
|
||||
// ignore the error of sending signal, since the process would
|
||||
// have been killed and exited yet.
|
||||
self.signal_process(process, Signal::SIGKILL as u32, false, device_manager)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(logger, "failed to signal kill. {:?}", e);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
// if use force mode to stop container, stop always successful
|
||||
// send kill signal to container
|
||||
// ignore the error of sending signal, since the process would
|
||||
// have been killed and exited yet.
|
||||
self.signal_process(process, Signal::SIGKILL as u32, false)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(logger, "failed to signal kill. {:?}", e);
|
||||
})
|
||||
.ok();
|
||||
|
||||
match process.process_type {
|
||||
ProcessType::Container => self
|
||||
.cleanup_container(&process.container_id.container_id, force)
|
||||
.cleanup_container(&process.container_id.container_id, force, device_manager)
|
||||
.await
|
||||
.context("stop container")?,
|
||||
ProcessType::Exec => {
|
||||
@ -252,7 +263,6 @@ impl ContainerInner {
|
||||
process: &ContainerProcess,
|
||||
signal: u32,
|
||||
all: bool,
|
||||
device_manager: &RwLock<DeviceManager>,
|
||||
) -> Result<()> {
|
||||
let mut process_id: agent::ContainerProcessID = process.clone().into();
|
||||
if all {
|
||||
@ -264,13 +274,6 @@ impl ContainerInner {
|
||||
.signal_process(agent::SignalProcessRequest { process_id, signal })
|
||||
.await?;
|
||||
|
||||
self.clean_volumes(device_manager)
|
||||
.await
|
||||
.context("clean volumes")?;
|
||||
self.clean_rootfs(device_manager)
|
||||
.await
|
||||
.context("clean rootfs")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -241,13 +241,6 @@ impl ContainerManager for VirtContainerManager {
|
||||
|
||||
info!(logger, "wait process exit status {:?}", status);
|
||||
|
||||
// stop process
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
c.stop_process(process).await.context("stop container")?;
|
||||
Ok(status.clone())
|
||||
}
|
||||
|
||||
@ -258,7 +251,9 @@ impl ContainerManager for VirtContainerManager {
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
c.start(process).await.context("start")?;
|
||||
c.start(self.containers.clone(), process)
|
||||
.await
|
||||
.context("start")?;
|
||||
|
||||
// Poststart Hooks:
|
||||
// * should be run in runtime namespace
|
||||
|
@ -4,21 +4,19 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::{Context, Result};
|
||||
use awaitgroup::{WaitGroup, Worker as WaitGroupWorker};
|
||||
use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
sync::{watch, RwLock},
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::sync::{watch, RwLock};
|
||||
|
||||
use super::{
|
||||
io::{ContainerIo, ShimIo},
|
||||
logger_with_process,
|
||||
};
|
||||
use super::container::Container;
|
||||
use super::io::{ContainerIo, ShimIo};
|
||||
use super::logger_with_process;
|
||||
|
||||
pub type ProcessWatcher = (
|
||||
Option<watch::Receiver<bool>>,
|
||||
@ -83,6 +81,7 @@ impl Process {
|
||||
|
||||
pub async fn start_io_and_wait(
|
||||
&mut self,
|
||||
containers: Arc<RwLock<HashMap<String, Container>>>,
|
||||
agent: Arc<dyn Agent>,
|
||||
container_io: ContainerIo,
|
||||
) -> Result<()> {
|
||||
@ -118,7 +117,9 @@ impl Process {
|
||||
}
|
||||
}
|
||||
|
||||
self.run_io_wait(agent, wg).await.context("run io thread")?;
|
||||
self.run_io_wait(containers, agent, wg)
|
||||
.await
|
||||
.context("run io thread")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -148,7 +149,15 @@ impl Process {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_io_wait(&mut self, agent: Arc<dyn Agent>, mut wg: WaitGroup) -> Result<()> {
|
||||
/// A container is considered exited once its IO ended.
|
||||
/// This function waits for IO to end. And then, do some cleanup
|
||||
/// things.
|
||||
async fn run_io_wait(
|
||||
&mut self,
|
||||
containers: Arc<RwLock<HashMap<String, Container>>>,
|
||||
agent: Arc<dyn Agent>,
|
||||
mut wg: WaitGroup,
|
||||
) -> Result<()> {
|
||||
let logger = self.logger.clone();
|
||||
info!(logger, "start run io wait");
|
||||
let process = self.process.clone();
|
||||
@ -177,12 +186,32 @@ impl Process {
|
||||
|
||||
info!(logger, "end wait process exit code {}", resp.status);
|
||||
|
||||
let containers = containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
let c = match containers.get(container_id) {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
error!(
|
||||
logger,
|
||||
"Failed to stop process, since container {} not found", container_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = c.stop_process(&process).await {
|
||||
error!(
|
||||
logger,
|
||||
"Failed to stop process, process = {:?}, err = {:?}", process, err
|
||||
);
|
||||
}
|
||||
|
||||
let mut exit_status = exit_status.write().await;
|
||||
exit_status.update_exit_code(resp.status);
|
||||
drop(exit_status);
|
||||
|
||||
let mut status = status.write().await;
|
||||
*status = ProcessStatus::Exited;
|
||||
*status = ProcessStatus::Stopped;
|
||||
drop(status);
|
||||
|
||||
drop(exit_notifier);
|
||||
|
Loading…
Reference in New Issue
Block a user