diff --git a/Cargo.lock b/Cargo.lock index 2a6f8c29d1..6b8fd10441 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8679,6 +8679,7 @@ dependencies = [ "slog-scope", "strum 0.24.1", "tokio", + "tokio-util", "tracing", "url", "uuid 1.23.1", diff --git a/Cargo.toml b/Cargo.toml index 7803baa1c5..593174bf37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,6 +201,7 @@ tdx = "0.1.1" tempfile = "3.19.1" thiserror = "1.0.26" tokio = "1.46.1" +tokio-util = "0.7.17" tokio-vsock = "0.3.4" toml = "0.5.8" tracing = "0.1.44" diff --git a/src/runtime-rs/crates/runtimes/common/src/error.rs b/src/runtime-rs/crates/runtimes/common/src/error.rs index 5e9fce6605..4268dd4495 100644 --- a/src/runtime-rs/crates/runtimes/common/src/error.rs +++ b/src/runtime-rs/crates/runtimes/common/src/error.rs @@ -4,6 +4,8 @@ // SPDX-License-Identifier: Apache-2.0 // +use nix::libc; + use crate::types::{ContainerProcess, SandboxResponse, TaskResponse}; #[derive(thiserror::Error, Debug)] @@ -21,3 +23,63 @@ pub enum Error { #[error("process already terminated")] ProcessAlreadyTerminated, } + +/// Common error messages indicating normal OOM shutdowns due to network issues. +const NORMAL_OOM_SHUTDOWN_MESSAGES: &[&str] = &[ + "Connection reset by peer", + "Broken pipe", + "transport endpoint is not connected", +]; + +/// Checks if an error indicates a normal oom shutdown due to network disconnections. +/// +/// This function identifies errors that commonly occur when a connection is gracefully +/// or unexpectedly terminated by the peer, such as network interruptions or the remote +/// end closing the connection. +pub fn is_normal_oom_shutdown_error(err: &anyhow::Error) -> bool { + // Check for common I/O error kinds that indicate connection issues. + if let Some(io_err) = err.downcast_ref::() { + match io_err.kind() { + std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected => return true, + _ => {} + } + } + + // Additionally, check the error message for specific substrings. + let error_string = err.to_string().to_lowercase(); + NORMAL_OOM_SHUTDOWN_MESSAGES + .iter() + .any(|pattern| error_string.contains(&pattern.to_lowercase())) +} + +/// List of common error message patterns indicating that a process or container is missing. +const NO_SUCH_PROCESS_MESSAGES: &[&str] = &[ + "no such process", + "process not found", + "init process not found", + "cannot find init process", +]; + +/// Returns `true` if the error indicates that the target process/container no longer exists. +/// This is used to determine if an operation, like signaling a process, failed because the +/// target is no longer available. +/// The function checks for standard OS error codes (`ESRCH`, `ENOENT`) and common error message patterns. +pub fn is_no_such_process_error(err: &anyhow::Error) -> bool { + // Check for standard OS error codes. + if let Some(io_err) = err.downcast_ref::() { + if let Some(raw_os_error) = io_err.raw_os_error() { + // standard "no such process" error. + if raw_os_error == libc::ESRCH || raw_os_error == libc::ENOENT { + return true; + } + } + } + + // Fallback to checking the error message for known patterns. + let error_string = err.to_string().to_lowercase(); + NO_SUCH_PROCESS_MESSAGES + .iter() + .any(|pattern| error_string.contains(&pattern.to_lowercase())) +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml index 74ff8a3163..78a26de56c 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -19,6 +19,7 @@ serde_json = { workspace = true } slog = { workspace = true } slog-scope = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } url = { workspace = true } tracing = { workspace = true } oci-spec = { workspace = true } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index cc5b7647b6..311ee3bc1b 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use agent::Agent; use anyhow::{anyhow, Context, Result}; use common::{ - error::Error, + error::{is_no_such_process_error, Error}, types::{ ContainerConfig, ContainerID, ContainerProcess, ProcessStateInfo, ProcessStatus, ProcessType, @@ -467,7 +467,20 @@ impl Container { return Ok(()); } - inner.signal_process(container_process, signal, all).await + match inner.signal_process(container_process, signal, all).await { + Ok(()) => Ok(()), + Err(e) if is_term_signal && is_no_such_process_error(&e) => { + info!( + self.logger, + "process already gone during kill, treating as success"; + "container" => &self.container_id.container_id, + "process" => ?container_process, + "signal" => signal + ); + Ok(()) + } + Err(e) => Err(e), + } } pub async fn exec_process( diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 4eb54472ee..fdbed86c66 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -7,7 +7,7 @@ use agent::Agent; use anyhow::{anyhow, Context, Result}; use common::{ - error::Error, + error::{is_no_such_process_error, Error}, types::{ContainerID, ContainerProcess, ProcessExitStatus, ProcessStatus, ProcessType}, }; use hypervisor::device::device_manager::DeviceManager; @@ -241,16 +241,31 @@ impl ContainerInner { .await .context("check state")?; - // if use force mode to stop container, stop always successful - // send kill signal to container - // ignore the error of sending signal, since the process would - // have been killed and exited yet. - self.signal_process(process, Signal::SIGKILL as u32, false) + // Send kill signal to the container's init process. + // + // We must never abort teardown when signaling fails: the process may + // already have exited (or the agent connection may already be gone + // during sandbox shutdown), and in all of these cases we still have to + // clean up the container's resources. Failing to do so leaves the + // container's mounts in place, which later makes the sandbox-level + // virtiofs cleanup fail with "Resource busy"/"Directory not empty". + // + // Errors indicating the process is already gone are logged at info + // level; any other failure is logged as a warning, but cleanup always + // proceeds. + if let Err(e) = self + .signal_process(process, Signal::SIGKILL as u32, false) .await - .map_err(|e| { - warn!(logger, "failed to signal kill. {:?}", e); - }) - .ok(); + { + if is_no_such_process_error(&e) { + info!( + logger, + "signal_process: init process is already gone, treat it as stopped" + ); + } else { + warn!(logger, "failed to send kill signal to process: {:?}", e); + } + } match process.process_type { ProcessType::Container => self diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs index 94ee975f67..9850a4cd4c 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs @@ -12,6 +12,7 @@ use agent::{ }; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; +use common::error::is_normal_oom_shutdown_error; use common::types::utils::option_system_time_into; use common::types::ContainerProcess; use common::{ @@ -78,6 +79,7 @@ use std::sync::Arc; use std::time::SystemTime; use strum::Display; use tokio::sync::{mpsc::Sender, watch, Mutex, RwLock}; +use tokio_util::sync::CancellationToken; use tracing::instrument; pub(crate) const VIRTCONTAINER: &str = "virt_container"; @@ -133,6 +135,7 @@ pub struct VirtSandbox { sandbox_config: Option, shm_size: u64, factory: Option, + cancel_token: CancellationToken, } impl std::fmt::Debug for VirtSandbox { @@ -165,6 +168,7 @@ impl VirtSandbox { let config = resource_manager.config().await; let keep_abnormal = config.runtime.keep_abnormal; let (exit_notify_tx, _) = watch::channel(false); + let cancel_token = CancellationToken::new(); Ok(Self { sid: sid.to_string(), msg_sender: Arc::new(Mutex::new(msg_sender)), @@ -177,6 +181,7 @@ impl VirtSandbox { shm_size: sandbox_config.shm_size, sandbox_config: Some(sandbox_config), factory: Some(factory), + cancel_token, }) } @@ -891,37 +896,51 @@ impl Sandbox for VirtSandbox { let agent = self.agent.clone(); let sender = self.msg_sender.clone(); + let cancel_token = self.cancel_token.clone(); + info!(sl!(), "oom watcher start"); tokio::spawn(async move { loop { - match agent - .get_oom_event(agent::Empty::new()) - .await - .context("get oom event") - { - Ok(resp) => { - let cid = &resp.container_id; - warn!(sl!(), "send oom event for container {}", &cid); - let event = TaskOOM { - container_id: cid.to_string(), - ..Default::default() - }; - let msg = Message::new(Action::Event(Arc::new(event))); - let lock_sender = sender.lock().await; - if let Err(err) = lock_sender.send(msg).await.context("send event") { - error!( - sl!(), - "failed to send oom event for {} error {:?}", cid, err - ); - } - } - Err(err) => { - warn!(sl!(), "failed to get oom event error {:?}", err); + tokio::select! { + _ = cancel_token.cancelled() => { + // Sandbox or VM is shutting down, gracefully exit watcher + info!(sl!(), "oom watcher cancelled, sandbox is stopping"); break; } + res = agent.get_oom_event(agent::Empty::new()) => { + match res.context("get oom event") { + Ok(resp) => { + let cid = &resp.container_id; + warn!(sl!(), "send oom event for container {}", &cid); + let event = TaskOOM { + container_id: cid.to_string(), + ..Default::default() + }; + let msg = Message::new(Action::Event(Arc::new(event))); + let lock_sender = sender.lock().await; + if let Err(err) = lock_sender.send(msg).await.context("send event") { + error!( + sl!(), + "failed to send oom event for {} error {:?}", cid, err + ); + } + } + Err(err) => { + // Handle errors by type + if is_normal_oom_shutdown_error(&err) { + info!(sl!(), "oom watcher exit on sandbox shutdown: {:?}", err); + break; + } else { + warn!(sl!(), "failed to get oom event error {:?}", err); + continue; + } + } + } + } } } }); + self.monitor.start(id, self.agent.clone()); self.save().await.context("save state")?; @@ -1040,6 +1059,10 @@ impl Sandbox for VirtSandbox { return Ok(()); } + // Cancel the OOM watcher before tearing down the VM so it exits + // cleanly instead of hitting ECONNRESET/EOF on a closed channel. + self.cancel_token.cancel(); + info!(sl!(), "begin stop sandbox"); if state == SandboxState::Init { let _ = self.hypervisor.stop_vm().await; @@ -1335,6 +1358,7 @@ impl Persist for VirtSandbox { sandbox_config: None, shm_size: DEFAULT_SHM_SIZE, factory: None, + cancel_token: CancellationToken::default(), }) } }