diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index f785c94d9a..a4603eebf1 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -26,9 +26,8 @@ use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; use nix::pty; use nix::sys::select::{select, FdSet}; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; -use nix::sys::wait::{self, WaitStatus}; +use nix::sys::wait; use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult}; -use prctl::set_child_subreaper; use std::collections::HashMap; use std::env; use std::ffi::{CStr, CString, OsStr}; @@ -52,6 +51,7 @@ mod network; mod pci; pub mod random; mod sandbox; +mod signal; #[cfg(test)] mod test_utils; mod uevent; @@ -60,6 +60,7 @@ mod version; use mount::{cgroups_mount, general_mount}; use sandbox::Sandbox; +use signal::setup_signal_handler; use slog::Logger; use uevent::watch_uevents; @@ -70,7 +71,6 @@ use futures::StreamExt as _; use rustjail::pipestream::PipeStream; use tokio::{ io::AsyncWrite, - signal::unix::{signal, SignalKind}, sync::{ oneshot::Sender, watch::{channel, Receiver}, @@ -237,7 +237,7 @@ async fn real_main() -> std::result::Result<(), Box> { } // Start the sandbox and wait for its ttRPC server to end - start_sandbox(&logger, &config, init_mode).await?; + start_sandbox(&logger, &config, init_mode, &mut tasks, shutdown_rx.clone()).await?; // Install a NOP logger for the remainder of the shutdown sequence // to ensure any log calls made by local crates using the scope logger @@ -296,7 +296,13 @@ fn main() -> std::result::Result<(), Box> { rt.block_on(real_main()) } -async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -> Result<()> { +async fn start_sandbox( + logger: &Logger, + config: &AgentConfig, + init_mode: bool, + tasks: &mut Vec>>, + shutdown: Receiver, +) -> Result<()> { let shells = SHELLS.clone(); let debug_console_vport = config.debug_console_vport as u32; @@ -326,9 +332,14 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) - let sandbox = Arc::new(Mutex::new(s)); - setup_signal_handler(&logger, sandbox.clone()) - .await - .unwrap(); + let signal_handler_task = tokio::spawn(setup_signal_handler( + logger.clone(), + sandbox.clone(), + shutdown.clone(), + )); + + tasks.push(signal_handler_task); + watch_uevents(sandbox.clone()).await; let (tx, rx) = tokio::sync::oneshot::channel(); @@ -348,93 +359,6 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) - Ok(()) } -use nix::sys::wait::WaitPidFlag; - -async fn setup_signal_handler(logger: &Logger, sandbox: Arc>) -> Result<()> { - let logger = logger.new(o!("subsystem" => "signals")); - - set_child_subreaper(true) - .map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?; - - let mut signal_stream = signal(SignalKind::child())?; - - tokio::spawn(async move { - 'outer: loop { - signal_stream.recv().await; - info!(logger, "received signal"; "signal" => "SIGCHLD"); - - // sevral signals can be combined together - // as one. So loop around to reap all - // exited children - 'inner: loop { - let wait_status = match wait::waitpid( - Some(Pid::from_raw(-1)), - Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL), - ) { - Ok(s) => { - if s == WaitStatus::StillAlive { - continue 'outer; - } - s - } - Err(e) => { - info!( - logger, - "waitpid reaper failed"; - "error" => e.as_errno().unwrap().desc() - ); - continue 'outer; - } - }; - info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status)); - - let pid = wait_status.pid(); - if let Some(pid) = pid { - let raw_pid = pid.as_raw(); - let child_pid = format!("{}", raw_pid); - - let logger = logger.new(o!("child-pid" => child_pid)); - - let mut sandbox = sandbox.lock().await; - let process = sandbox.find_process(raw_pid); - if process.is_none() { - info!(logger, "child exited unexpectedly"); - continue 'inner; - } - - let mut p = process.unwrap(); - - if p.exit_pipe_w.is_none() { - error!(logger, "the process's exit_pipe_w isn't set"); - continue 'inner; - } - let pipe_write = p.exit_pipe_w.unwrap(); - let ret: i32; - - match wait_status { - WaitStatus::Exited(_, c) => ret = c, - WaitStatus::Signaled(_, sig, _) => ret = sig as i32, - _ => { - info!(logger, "got wrong status for process"; - "child-status" => format!("{:?}", wait_status)); - continue 'inner; - } - } - - p.exit_code = ret; - let _ = unistd::close(pipe_write); - - info!(logger, "notify term to close"); - // close the socket file to notify readStdio to close terminal specifically - // in case this process's terminal has been inherited by its children. - p.notify_term_close(); - } - } - } - }); - Ok(()) -} - // init_agent_as_init will do the initializations such as setting up the rootfs // when this agent has been run as the init process. fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> { diff --git a/src/agent/src/signal.rs b/src/agent/src/signal.rs new file mode 100644 index 0000000000..283951117e --- /dev/null +++ b/src/agent/src/signal.rs @@ -0,0 +1,159 @@ +// Copyright (c) 2019-2020 Ant Financial +// Copyright (c) 2020 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +use crate::sandbox::Sandbox; +use anyhow::{anyhow, Result}; +use nix::sys::wait::WaitPidFlag; +use nix::sys::wait::{self, WaitStatus}; +use nix::unistd; +use prctl::set_child_subreaper; +use slog::{error, info, o, Logger}; +use std::sync::Arc; +use tokio::select; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::watch::Receiver; +use tokio::sync::Mutex; +use unistd::Pid; + +async fn handle_sigchild(logger: Logger, sandbox: Arc>) -> Result<()> { + info!(logger, "handling signal"; "signal" => "SIGCHLD"); + + loop { + let result = wait::waitpid( + Some(Pid::from_raw(-1)), + Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL), + ); + + let wait_status = match result { + Ok(s) => { + if s == WaitStatus::StillAlive { + return Ok(()); + } + s + } + Err(e) => return Err(anyhow!(e).context("waitpid reaper failed")), + }; + + info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status)); + + if let Some(pid) = wait_status.pid() { + let raw_pid = pid.as_raw(); + let child_pid = format!("{}", raw_pid); + + let logger = logger.new(o!("child-pid" => child_pid)); + + let sandbox_ref = sandbox.clone(); + let mut sandbox = sandbox_ref.lock().await; + + let process = sandbox.find_process(raw_pid); + if process.is_none() { + info!(logger, "child exited unexpectedly"); + continue; + } + + let mut p = process.unwrap(); + + if p.exit_pipe_w.is_none() { + info!(logger, "process exit pipe not set"); + continue; + } + + let pipe_write = p.exit_pipe_w.unwrap(); + let ret: i32; + + match wait_status { + WaitStatus::Exited(_, c) => ret = c, + WaitStatus::Signaled(_, sig, _) => ret = sig as i32, + _ => { + info!(logger, "got wrong status for process"; + "child-status" => format!("{:?}", wait_status)); + continue; + } + } + + p.exit_code = ret; + let _ = unistd::close(pipe_write); + + info!(logger, "notify term to close"); + // close the socket file to notify readStdio to close terminal specifically + // in case this process's terminal has been inherited by its children. + p.notify_term_close(); + } + } +} + +pub async fn setup_signal_handler( + logger: Logger, + sandbox: Arc>, + mut shutdown: Receiver, +) -> Result<()> { + let logger = logger.new(o!("subsystem" => "signals")); + + set_child_subreaper(true) + .map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?; + + let mut sigchild_stream = signal(SignalKind::child())?; + + loop { + select! { + _ = shutdown.changed() => { + info!(logger, "got shutdown request"); + break; + } + + _ = sigchild_stream.recv() => { + let result = handle_sigchild(logger.clone(), sandbox.clone()).await; + + match result { + Ok(()) => (), + Err(e) => { + // Log errors, but don't abort - just wait for more signals! + error!(logger, "failed to handle signal"; "error" => format!("{:?}", e)); + } + } + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::pin; + use tokio::sync::watch::channel; + use tokio::time::Duration; + + #[tokio::test] + async fn test_setup_signal_handler() { + let logger = slog::Logger::root(slog::Discard, o!()); + let s = Sandbox::new(&logger).unwrap(); + + let sandbox = Arc::new(Mutex::new(s)); + + let (tx, rx) = channel(true); + + let handle = tokio::spawn(setup_signal_handler(logger, sandbox, rx)); + + let timeout = tokio::time::sleep(Duration::from_secs(1)); + pin!(timeout); + + tx.send(true).expect("failed to request shutdown"); + + loop { + select! { + _ = handle => { + println!("INFO: task completed"); + break; + }, + _ = &mut timeout => { + panic!("signal thread failed to stop"); + } + } + } + } +}