signal: Move to a new module

Move the signal handling code into a new module and refactor into the
main handler and a new SIGCHLD handling function to make the code
simpler and easier to understand.

Also added a unit test for shutdown.

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2021-03-22 11:20:50 +00:00
parent 011f7d785a
commit d8d5b4cd1d
2 changed files with 178 additions and 95 deletions

View File

@ -26,9 +26,8 @@ use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use nix::pty;
use nix::sys::select::{select, FdSet};
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::sys::wait::{self, WaitStatus};
use nix::sys::wait;
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
use prctl::set_child_subreaper;
use std::collections::HashMap;
use std::env;
use std::ffi::{CStr, CString, OsStr};
@ -52,6 +51,7 @@ mod network;
mod pci;
pub mod random;
mod sandbox;
mod signal;
#[cfg(test)]
mod test_utils;
mod uevent;
@ -60,6 +60,7 @@ mod version;
use mount::{cgroups_mount, general_mount};
use sandbox::Sandbox;
use signal::setup_signal_handler;
use slog::Logger;
use uevent::watch_uevents;
@ -70,7 +71,6 @@ use futures::StreamExt as _;
use rustjail::pipestream::PipeStream;
use tokio::{
io::AsyncWrite,
signal::unix::{signal, SignalKind},
sync::{
oneshot::Sender,
watch::{channel, Receiver},
@ -237,7 +237,7 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
// Start the sandbox and wait for its ttRPC server to end
start_sandbox(&logger, &config, init_mode).await?;
start_sandbox(&logger, &config, init_mode, &mut tasks, shutdown_rx.clone()).await?;
// Install a NOP logger for the remainder of the shutdown sequence
// to ensure any log calls made by local crates using the scope logger
@ -296,7 +296,13 @@ fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
rt.block_on(real_main())
}
async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -> Result<()> {
async fn start_sandbox(
logger: &Logger,
config: &AgentConfig,
init_mode: bool,
tasks: &mut Vec<JoinHandle<Result<()>>>,
shutdown: Receiver<bool>,
) -> Result<()> {
let shells = SHELLS.clone();
let debug_console_vport = config.debug_console_vport as u32;
@ -326,9 +332,14 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -
let sandbox = Arc::new(Mutex::new(s));
setup_signal_handler(&logger, sandbox.clone())
.await
.unwrap();
let signal_handler_task = tokio::spawn(setup_signal_handler(
logger.clone(),
sandbox.clone(),
shutdown.clone(),
));
tasks.push(signal_handler_task);
watch_uevents(sandbox.clone()).await;
let (tx, rx) = tokio::sync::oneshot::channel();
@ -348,93 +359,6 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -
Ok(())
}
use nix::sys::wait::WaitPidFlag;
async fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result<()> {
let logger = logger.new(o!("subsystem" => "signals"));
set_child_subreaper(true)
.map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?;
let mut signal_stream = signal(SignalKind::child())?;
tokio::spawn(async move {
'outer: loop {
signal_stream.recv().await;
info!(logger, "received signal"; "signal" => "SIGCHLD");
// sevral signals can be combined together
// as one. So loop around to reap all
// exited children
'inner: loop {
let wait_status = match wait::waitpid(
Some(Pid::from_raw(-1)),
Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL),
) {
Ok(s) => {
if s == WaitStatus::StillAlive {
continue 'outer;
}
s
}
Err(e) => {
info!(
logger,
"waitpid reaper failed";
"error" => e.as_errno().unwrap().desc()
);
continue 'outer;
}
};
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
let pid = wait_status.pid();
if let Some(pid) = pid {
let raw_pid = pid.as_raw();
let child_pid = format!("{}", raw_pid);
let logger = logger.new(o!("child-pid" => child_pid));
let mut sandbox = sandbox.lock().await;
let process = sandbox.find_process(raw_pid);
if process.is_none() {
info!(logger, "child exited unexpectedly");
continue 'inner;
}
let mut p = process.unwrap();
if p.exit_pipe_w.is_none() {
error!(logger, "the process's exit_pipe_w isn't set");
continue 'inner;
}
let pipe_write = p.exit_pipe_w.unwrap();
let ret: i32;
match wait_status {
WaitStatus::Exited(_, c) => ret = c,
WaitStatus::Signaled(_, sig, _) => ret = sig as i32,
_ => {
info!(logger, "got wrong status for process";
"child-status" => format!("{:?}", wait_status));
continue 'inner;
}
}
p.exit_code = ret;
let _ = unistd::close(pipe_write);
info!(logger, "notify term to close");
// close the socket file to notify readStdio to close terminal specifically
// in case this process's terminal has been inherited by its children.
p.notify_term_close();
}
}
}
});
Ok(())
}
// init_agent_as_init will do the initializations such as setting up the rootfs
// when this agent has been run as the init process.
fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {

159
src/agent/src/signal.rs Normal file
View File

@ -0,0 +1,159 @@
// Copyright (c) 2019-2020 Ant Financial
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
use crate::sandbox::Sandbox;
use anyhow::{anyhow, Result};
use nix::sys::wait::WaitPidFlag;
use nix::sys::wait::{self, WaitStatus};
use nix::unistd;
use prctl::set_child_subreaper;
use slog::{error, info, o, Logger};
use std::sync::Arc;
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::watch::Receiver;
use tokio::sync::Mutex;
use unistd::Pid;
async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result<()> {
info!(logger, "handling signal"; "signal" => "SIGCHLD");
loop {
let result = wait::waitpid(
Some(Pid::from_raw(-1)),
Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL),
);
let wait_status = match result {
Ok(s) => {
if s == WaitStatus::StillAlive {
return Ok(());
}
s
}
Err(e) => return Err(anyhow!(e).context("waitpid reaper failed")),
};
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
if let Some(pid) = wait_status.pid() {
let raw_pid = pid.as_raw();
let child_pid = format!("{}", raw_pid);
let logger = logger.new(o!("child-pid" => child_pid));
let sandbox_ref = sandbox.clone();
let mut sandbox = sandbox_ref.lock().await;
let process = sandbox.find_process(raw_pid);
if process.is_none() {
info!(logger, "child exited unexpectedly");
continue;
}
let mut p = process.unwrap();
if p.exit_pipe_w.is_none() {
info!(logger, "process exit pipe not set");
continue;
}
let pipe_write = p.exit_pipe_w.unwrap();
let ret: i32;
match wait_status {
WaitStatus::Exited(_, c) => ret = c,
WaitStatus::Signaled(_, sig, _) => ret = sig as i32,
_ => {
info!(logger, "got wrong status for process";
"child-status" => format!("{:?}", wait_status));
continue;
}
}
p.exit_code = ret;
let _ = unistd::close(pipe_write);
info!(logger, "notify term to close");
// close the socket file to notify readStdio to close terminal specifically
// in case this process's terminal has been inherited by its children.
p.notify_term_close();
}
}
}
pub async fn setup_signal_handler(
logger: Logger,
sandbox: Arc<Mutex<Sandbox>>,
mut shutdown: Receiver<bool>,
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "signals"));
set_child_subreaper(true)
.map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?;
let mut sigchild_stream = signal(SignalKind::child())?;
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
}
_ = sigchild_stream.recv() => {
let result = handle_sigchild(logger.clone(), sandbox.clone()).await;
match result {
Ok(()) => (),
Err(e) => {
// Log errors, but don't abort - just wait for more signals!
error!(logger, "failed to handle signal"; "error" => format!("{:?}", e));
}
}
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::pin;
use tokio::sync::watch::channel;
use tokio::time::Duration;
#[tokio::test]
async fn test_setup_signal_handler() {
let logger = slog::Logger::root(slog::Discard, o!());
let s = Sandbox::new(&logger).unwrap();
let sandbox = Arc::new(Mutex::new(s));
let (tx, rx) = channel(true);
let handle = tokio::spawn(setup_signal_handler(logger, sandbox, rx));
let timeout = tokio::time::sleep(Duration::from_secs(1));
pin!(timeout);
tx.send(true).expect("failed to request shutdown");
loop {
select! {
_ = handle => {
println!("INFO: task completed");
break;
},
_ = &mut timeout => {
panic!("signal thread failed to stop");
}
}
}
}
}