diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index 3772a8a7cd..0e6f80a4f6 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -184,7 +184,6 @@ pub enum ProcessStatus { Stopped = 3, Paused = 4, Pausing = 5, - Exited = 6, } #[derive(Debug, Clone)] diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs index 841805bb0e..5f758f5676 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -56,7 +56,6 @@ impl From for api::Status { ProcessStatus::Stopped => api::Status::STOPPED, ProcessStatus::Paused => api::Status::PAUSED, ProcessStatus::Pausing => api::Status::PAUSING, - ProcessStatus::Exited => api::Status::STOPPED, } } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index c7e7378b1d..efab56a682 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -186,7 +186,11 @@ impl Container { Ok(()) } - pub async fn start(&self, process: &ContainerProcess) -> Result<()> { + pub async fn start( + &self, + containers: Arc>>, + process: &ContainerProcess, + ) -> Result<()> { let mut inner = self.inner.write().await; match process.process_type { ProcessType::Container => { @@ -199,7 +203,7 @@ impl Container { let container_io = inner.new_container_io(process).await?; inner .init_process - .start_io_and_wait(self.agent.clone(), container_io) + .start_io_and_wait(containers, self.agent.clone(), container_io) .await?; } ProcessType::Exec => { @@ -232,7 +236,7 @@ impl Container { .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; exec.process - .start_io_and_wait(self.agent.clone(), container_io) + .start_io_and_wait(containers, self.agent.clone(), container_io) .await .context("start io and wait")?; } @@ -287,10 +291,7 @@ impl Container { all: bool, ) -> Result<()> { let mut inner = self.inner.write().await; - let device_manager = self.resource_manager.get_device_manager().await; - inner - .signal_process(container_process, signal, all, &device_manager) - .await + inner.signal_process(container_process, signal, all).await } pub async fn exec_process( diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index bc478cbcdc..8c5e766d82 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -166,7 +166,12 @@ impl ContainerInner { } } - async fn cleanup_container(&mut self, cid: &str, force: bool) -> Result<()> { + async fn cleanup_container( + &mut self, + cid: &str, + force: bool, + device_manager: &RwLock, + ) -> Result<()> { // wait until the container process // terminated and the status write lock released. info!(self.logger, "wait on container terminated"); @@ -195,6 +200,14 @@ impl ContainerInner { // close the exit channel to wakeup wait service // send to notify watchers who are waiting for the process exit self.init_process.stop().await; + + self.clean_volumes(device_manager) + .await + .context("clean volumes")?; + self.clean_rootfs(device_manager) + .await + .context("clean rootfs")?; + Ok(()) } @@ -213,26 +226,24 @@ impl ContainerInner { return Ok(()); } - self.check_state(vec![ProcessStatus::Running, ProcessStatus::Exited]) + self.check_state(vec![ProcessStatus::Running]) .await .context("check state")?; - if state == ProcessStatus::Running { - // if use force mode to stop container, stop always successful - // send kill signal to container - // ignore the error of sending signal, since the process would - // have been killed and exited yet. - self.signal_process(process, Signal::SIGKILL as u32, false, device_manager) - .await - .map_err(|e| { - warn!(logger, "failed to signal kill. {:?}", e); - }) - .ok(); - } + // if use force mode to stop container, stop always successful + // send kill signal to container + // ignore the error of sending signal, since the process would + // have been killed and exited yet. + self.signal_process(process, Signal::SIGKILL as u32, false) + .await + .map_err(|e| { + warn!(logger, "failed to signal kill. {:?}", e); + }) + .ok(); match process.process_type { ProcessType::Container => self - .cleanup_container(&process.container_id.container_id, force) + .cleanup_container(&process.container_id.container_id, force, device_manager) .await .context("stop container")?, ProcessType::Exec => { @@ -252,7 +263,6 @@ impl ContainerInner { process: &ContainerProcess, signal: u32, all: bool, - device_manager: &RwLock, ) -> Result<()> { let mut process_id: agent::ContainerProcessID = process.clone().into(); if all { @@ -264,13 +274,6 @@ impl ContainerInner { .signal_process(agent::SignalProcessRequest { process_id, signal }) .await?; - self.clean_volumes(device_manager) - .await - .context("clean volumes")?; - self.clean_rootfs(device_manager) - .await - .context("clean rootfs")?; - Ok(()) } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs index d900632cee..f6a6553e97 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs @@ -241,13 +241,6 @@ impl ContainerManager for VirtContainerManager { info!(logger, "wait process exit status {:?}", status); - // stop process - let containers = self.containers.read().await; - let container_id = &process.container_id.container_id; - let c = containers - .get(container_id) - .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; - c.stop_process(process).await.context("stop container")?; Ok(status.clone()) } @@ -258,7 +251,9 @@ impl ContainerManager for VirtContainerManager { let c = containers .get(container_id) .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; - c.start(process).await.context("start")?; + c.start(self.containers.clone(), process) + .await + .context("start")?; // Poststart Hooks: // * should be run in runtime namespace diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 32856f27c3..cd73134dd2 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -4,21 +4,19 @@ // SPDX-License-Identifier: Apache-2.0 // +use std::collections::HashMap; use std::sync::Arc; use agent::Agent; use anyhow::{Context, Result}; use awaitgroup::{WaitGroup, Worker as WaitGroupWorker}; use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID}; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::{watch, RwLock}, -}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::sync::{watch, RwLock}; -use super::{ - io::{ContainerIo, ShimIo}, - logger_with_process, -}; +use super::container::Container; +use super::io::{ContainerIo, ShimIo}; +use super::logger_with_process; pub type ProcessWatcher = ( Option>, @@ -83,6 +81,7 @@ impl Process { pub async fn start_io_and_wait( &mut self, + containers: Arc>>, agent: Arc, container_io: ContainerIo, ) -> Result<()> { @@ -118,7 +117,9 @@ impl Process { } } - self.run_io_wait(agent, wg).await.context("run io thread")?; + self.run_io_wait(containers, agent, wg) + .await + .context("run io thread")?; Ok(()) } @@ -148,7 +149,15 @@ impl Process { Ok(()) } - async fn run_io_wait(&mut self, agent: Arc, mut wg: WaitGroup) -> Result<()> { + /// A container is considered exited once its IO ended. + /// This function waits for IO to end. And then, do some cleanup + /// things. + async fn run_io_wait( + &mut self, + containers: Arc>>, + agent: Arc, + mut wg: WaitGroup, + ) -> Result<()> { let logger = self.logger.clone(); info!(logger, "start run io wait"); let process = self.process.clone(); @@ -177,12 +186,32 @@ impl Process { info!(logger, "end wait process exit code {}", resp.status); + let containers = containers.read().await; + let container_id = &process.container_id.container_id; + let c = match containers.get(container_id) { + Some(c) => c, + None => { + error!( + logger, + "Failed to stop process, since container {} not found", container_id + ); + return; + } + }; + + if let Err(err) = c.stop_process(&process).await { + error!( + logger, + "Failed to stop process, process = {:?}, err = {:?}", process, err + ); + } + let mut exit_status = exit_status.write().await; exit_status.update_exit_code(resp.status); drop(exit_status); let mut status = status.write().await; - *status = ProcessStatus::Exited; + *status = ProcessStatus::Stopped; drop(status); drop(exit_notifier);