Merge pull request #12293 from Apokleos/graceful-errors

runtime-rs: make OOM watcher and signal handling lifecycle-aware
This commit is contained in:
Fabiano Fidêncio
2026-06-16 15:02:54 +02:00
committed by GitHub
7 changed files with 152 additions and 35 deletions

1
Cargo.lock generated
View File

@@ -8679,6 +8679,7 @@ dependencies = [
"slog-scope",
"strum 0.24.1",
"tokio",
"tokio-util",
"tracing",
"url",
"uuid 1.23.1",

View File

@@ -201,6 +201,7 @@ tdx = "0.1.1"
tempfile = "3.19.1"
thiserror = "1.0.26"
tokio = "1.46.1"
tokio-util = "0.7.17"
tokio-vsock = "0.3.4"
toml = "0.5.8"
tracing = "0.1.44"

View File

@@ -4,6 +4,8 @@
// SPDX-License-Identifier: Apache-2.0
//
use nix::libc;
use crate::types::{ContainerProcess, SandboxResponse, TaskResponse};
#[derive(thiserror::Error, Debug)]
@@ -21,3 +23,63 @@ pub enum Error {
#[error("process already terminated")]
ProcessAlreadyTerminated,
}
/// Common error messages indicating normal OOM shutdowns due to network issues.
const NORMAL_OOM_SHUTDOWN_MESSAGES: &[&str] = &[
"Connection reset by peer",
"Broken pipe",
"transport endpoint is not connected",
];
/// Checks if an error indicates a normal oom shutdown due to network disconnections.
///
/// This function identifies errors that commonly occur when a connection is gracefully
/// or unexpectedly terminated by the peer, such as network interruptions or the remote
/// end closing the connection.
pub fn is_normal_oom_shutdown_error(err: &anyhow::Error) -> bool {
// Check for common I/O error kinds that indicate connection issues.
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
match io_err.kind() {
std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::NotConnected => return true,
_ => {}
}
}
// Additionally, check the error message for specific substrings.
let error_string = err.to_string().to_lowercase();
NORMAL_OOM_SHUTDOWN_MESSAGES
.iter()
.any(|pattern| error_string.contains(&pattern.to_lowercase()))
}
/// List of common error message patterns indicating that a process or container is missing.
const NO_SUCH_PROCESS_MESSAGES: &[&str] = &[
"no such process",
"process not found",
"init process not found",
"cannot find init process",
];
/// Returns `true` if the error indicates that the target process/container no longer exists.
/// This is used to determine if an operation, like signaling a process, failed because the
/// target is no longer available.
/// The function checks for standard OS error codes (`ESRCH`, `ENOENT`) and common error message patterns.
pub fn is_no_such_process_error(err: &anyhow::Error) -> bool {
// Check for standard OS error codes.
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
if let Some(raw_os_error) = io_err.raw_os_error() {
// standard "no such process" error.
if raw_os_error == libc::ESRCH || raw_os_error == libc::ENOENT {
return true;
}
}
}
// Fallback to checking the error message for known patterns.
let error_string = err.to_string().to_lowercase();
NO_SUCH_PROCESS_MESSAGES
.iter()
.any(|pattern| error_string.contains(&pattern.to_lowercase()))
}

View File

@@ -19,6 +19,7 @@ serde_json = { workspace = true }
slog = { workspace = true }
slog-scope = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
url = { workspace = true }
tracing = { workspace = true }
oci-spec = { workspace = true }

View File

@@ -10,7 +10,7 @@ use std::sync::Arc;
use agent::Agent;
use anyhow::{anyhow, Context, Result};
use common::{
error::Error,
error::{is_no_such_process_error, Error},
types::{
ContainerConfig, ContainerID, ContainerProcess, ProcessStateInfo, ProcessStatus,
ProcessType,
@@ -467,7 +467,20 @@ impl Container {
return Ok(());
}
inner.signal_process(container_process, signal, all).await
match inner.signal_process(container_process, signal, all).await {
Ok(()) => Ok(()),
Err(e) if is_term_signal && is_no_such_process_error(&e) => {
info!(
self.logger,
"process already gone during kill, treating as success";
"container" => &self.container_id.container_id,
"process" => ?container_process,
"signal" => signal
);
Ok(())
}
Err(e) => Err(e),
}
}
pub async fn exec_process(

View File

@@ -7,7 +7,7 @@
use agent::Agent;
use anyhow::{anyhow, Context, Result};
use common::{
error::Error,
error::{is_no_such_process_error, Error},
types::{ContainerID, ContainerProcess, ProcessExitStatus, ProcessStatus, ProcessType},
};
use hypervisor::device::device_manager::DeviceManager;
@@ -241,16 +241,31 @@ impl ContainerInner {
.await
.context("check state")?;
// if use force mode to stop container, stop always successful
// send kill signal to container
// ignore the error of sending signal, since the process would
// have been killed and exited yet.
self.signal_process(process, Signal::SIGKILL as u32, false)
// Send kill signal to the container's init process.
//
// We must never abort teardown when signaling fails: the process may
// already have exited (or the agent connection may already be gone
// during sandbox shutdown), and in all of these cases we still have to
// clean up the container's resources. Failing to do so leaves the
// container's mounts in place, which later makes the sandbox-level
// virtiofs cleanup fail with "Resource busy"/"Directory not empty".
//
// Errors indicating the process is already gone are logged at info
// level; any other failure is logged as a warning, but cleanup always
// proceeds.
if let Err(e) = self
.signal_process(process, Signal::SIGKILL as u32, false)
.await
.map_err(|e| {
warn!(logger, "failed to signal kill. {:?}", e);
})
.ok();
{
if is_no_such_process_error(&e) {
info!(
logger,
"signal_process: init process is already gone, treat it as stopped"
);
} else {
warn!(logger, "failed to send kill signal to process: {:?}", e);
}
}
match process.process_type {
ProcessType::Container => self

View File

@@ -12,6 +12,7 @@ use agent::{
};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use common::error::is_normal_oom_shutdown_error;
use common::types::utils::option_system_time_into;
use common::types::ContainerProcess;
use common::{
@@ -78,6 +79,7 @@ use std::sync::Arc;
use std::time::SystemTime;
use strum::Display;
use tokio::sync::{mpsc::Sender, watch, Mutex, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::instrument;
pub(crate) const VIRTCONTAINER: &str = "virt_container";
@@ -133,6 +135,7 @@ pub struct VirtSandbox {
sandbox_config: Option<SandboxConfig>,
shm_size: u64,
factory: Option<Factory>,
cancel_token: CancellationToken,
}
impl std::fmt::Debug for VirtSandbox {
@@ -165,6 +168,7 @@ impl VirtSandbox {
let config = resource_manager.config().await;
let keep_abnormal = config.runtime.keep_abnormal;
let (exit_notify_tx, _) = watch::channel(false);
let cancel_token = CancellationToken::new();
Ok(Self {
sid: sid.to_string(),
msg_sender: Arc::new(Mutex::new(msg_sender)),
@@ -177,6 +181,7 @@ impl VirtSandbox {
shm_size: sandbox_config.shm_size,
sandbox_config: Some(sandbox_config),
factory: Some(factory),
cancel_token,
})
}
@@ -891,37 +896,51 @@ impl Sandbox for VirtSandbox {
let agent = self.agent.clone();
let sender = self.msg_sender.clone();
let cancel_token = self.cancel_token.clone();
info!(sl!(), "oom watcher start");
tokio::spawn(async move {
loop {
match agent
.get_oom_event(agent::Empty::new())
.await
.context("get oom event")
{
Ok(resp) => {
let cid = &resp.container_id;
warn!(sl!(), "send oom event for container {}", &cid);
let event = TaskOOM {
container_id: cid.to_string(),
..Default::default()
};
let msg = Message::new(Action::Event(Arc::new(event)));
let lock_sender = sender.lock().await;
if let Err(err) = lock_sender.send(msg).await.context("send event") {
error!(
sl!(),
"failed to send oom event for {} error {:?}", cid, err
);
}
}
Err(err) => {
warn!(sl!(), "failed to get oom event error {:?}", err);
tokio::select! {
_ = cancel_token.cancelled() => {
// Sandbox or VM is shutting down, gracefully exit watcher
info!(sl!(), "oom watcher cancelled, sandbox is stopping");
break;
}
res = agent.get_oom_event(agent::Empty::new()) => {
match res.context("get oom event") {
Ok(resp) => {
let cid = &resp.container_id;
warn!(sl!(), "send oom event for container {}", &cid);
let event = TaskOOM {
container_id: cid.to_string(),
..Default::default()
};
let msg = Message::new(Action::Event(Arc::new(event)));
let lock_sender = sender.lock().await;
if let Err(err) = lock_sender.send(msg).await.context("send event") {
error!(
sl!(),
"failed to send oom event for {} error {:?}", cid, err
);
}
}
Err(err) => {
// Handle errors by type
if is_normal_oom_shutdown_error(&err) {
info!(sl!(), "oom watcher exit on sandbox shutdown: {:?}", err);
break;
} else {
warn!(sl!(), "failed to get oom event error {:?}", err);
continue;
}
}
}
}
}
}
});
self.monitor.start(id, self.agent.clone());
self.save().await.context("save state")?;
@@ -1040,6 +1059,10 @@ impl Sandbox for VirtSandbox {
return Ok(());
}
// Cancel the OOM watcher before tearing down the VM so it exits
// cleanly instead of hitting ECONNRESET/EOF on a closed channel.
self.cancel_token.cancel();
info!(sl!(), "begin stop sandbox");
if state == SandboxState::Init {
let _ = self.hypervisor.stop_vm().await;
@@ -1335,6 +1358,7 @@ impl Persist for VirtSandbox {
sandbox_config: None,
shm_size: DEFAULT_SHM_SIZE,
factory: None,
cancel_token: CancellationToken::default(),
})
}
}