mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-28 19:54:35 +00:00
signal: Move to a new module
Move the signal handling code into a new module and refactor into the main handler and a new SIGCHLD handling function to make the code simpler and easier to understand. Also added a unit test for shutdown. Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
parent
011f7d785a
commit
d8d5b4cd1d
@ -26,9 +26,8 @@ use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
|
||||
use nix::pty;
|
||||
use nix::sys::select::{select, FdSet};
|
||||
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
||||
use nix::sys::wait::{self, WaitStatus};
|
||||
use nix::sys::wait;
|
||||
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
|
||||
use prctl::set_child_subreaper;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::ffi::{CStr, CString, OsStr};
|
||||
@ -52,6 +51,7 @@ mod network;
|
||||
mod pci;
|
||||
pub mod random;
|
||||
mod sandbox;
|
||||
mod signal;
|
||||
#[cfg(test)]
|
||||
mod test_utils;
|
||||
mod uevent;
|
||||
@ -60,6 +60,7 @@ mod version;
|
||||
|
||||
use mount::{cgroups_mount, general_mount};
|
||||
use sandbox::Sandbox;
|
||||
use signal::setup_signal_handler;
|
||||
use slog::Logger;
|
||||
use uevent::watch_uevents;
|
||||
|
||||
@ -70,7 +71,6 @@ use futures::StreamExt as _;
|
||||
use rustjail::pipestream::PipeStream;
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
signal::unix::{signal, SignalKind},
|
||||
sync::{
|
||||
oneshot::Sender,
|
||||
watch::{channel, Receiver},
|
||||
@ -237,7 +237,7 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
|
||||
// Start the sandbox and wait for its ttRPC server to end
|
||||
start_sandbox(&logger, &config, init_mode).await?;
|
||||
start_sandbox(&logger, &config, init_mode, &mut tasks, shutdown_rx.clone()).await?;
|
||||
|
||||
// Install a NOP logger for the remainder of the shutdown sequence
|
||||
// to ensure any log calls made by local crates using the scope logger
|
||||
@ -296,7 +296,13 @@ fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
rt.block_on(real_main())
|
||||
}
|
||||
|
||||
async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -> Result<()> {
|
||||
async fn start_sandbox(
|
||||
logger: &Logger,
|
||||
config: &AgentConfig,
|
||||
init_mode: bool,
|
||||
tasks: &mut Vec<JoinHandle<Result<()>>>,
|
||||
shutdown: Receiver<bool>,
|
||||
) -> Result<()> {
|
||||
let shells = SHELLS.clone();
|
||||
let debug_console_vport = config.debug_console_vport as u32;
|
||||
|
||||
@ -326,9 +332,14 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -
|
||||
|
||||
let sandbox = Arc::new(Mutex::new(s));
|
||||
|
||||
setup_signal_handler(&logger, sandbox.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let signal_handler_task = tokio::spawn(setup_signal_handler(
|
||||
logger.clone(),
|
||||
sandbox.clone(),
|
||||
shutdown.clone(),
|
||||
));
|
||||
|
||||
tasks.push(signal_handler_task);
|
||||
|
||||
watch_uevents(sandbox.clone()).await;
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
@ -348,93 +359,6 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -
|
||||
Ok(())
|
||||
}
|
||||
|
||||
use nix::sys::wait::WaitPidFlag;
|
||||
|
||||
async fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result<()> {
|
||||
let logger = logger.new(o!("subsystem" => "signals"));
|
||||
|
||||
set_child_subreaper(true)
|
||||
.map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?;
|
||||
|
||||
let mut signal_stream = signal(SignalKind::child())?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
'outer: loop {
|
||||
signal_stream.recv().await;
|
||||
info!(logger, "received signal"; "signal" => "SIGCHLD");
|
||||
|
||||
// sevral signals can be combined together
|
||||
// as one. So loop around to reap all
|
||||
// exited children
|
||||
'inner: loop {
|
||||
let wait_status = match wait::waitpid(
|
||||
Some(Pid::from_raw(-1)),
|
||||
Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL),
|
||||
) {
|
||||
Ok(s) => {
|
||||
if s == WaitStatus::StillAlive {
|
||||
continue 'outer;
|
||||
}
|
||||
s
|
||||
}
|
||||
Err(e) => {
|
||||
info!(
|
||||
logger,
|
||||
"waitpid reaper failed";
|
||||
"error" => e.as_errno().unwrap().desc()
|
||||
);
|
||||
continue 'outer;
|
||||
}
|
||||
};
|
||||
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
|
||||
|
||||
let pid = wait_status.pid();
|
||||
if let Some(pid) = pid {
|
||||
let raw_pid = pid.as_raw();
|
||||
let child_pid = format!("{}", raw_pid);
|
||||
|
||||
let logger = logger.new(o!("child-pid" => child_pid));
|
||||
|
||||
let mut sandbox = sandbox.lock().await;
|
||||
let process = sandbox.find_process(raw_pid);
|
||||
if process.is_none() {
|
||||
info!(logger, "child exited unexpectedly");
|
||||
continue 'inner;
|
||||
}
|
||||
|
||||
let mut p = process.unwrap();
|
||||
|
||||
if p.exit_pipe_w.is_none() {
|
||||
error!(logger, "the process's exit_pipe_w isn't set");
|
||||
continue 'inner;
|
||||
}
|
||||
let pipe_write = p.exit_pipe_w.unwrap();
|
||||
let ret: i32;
|
||||
|
||||
match wait_status {
|
||||
WaitStatus::Exited(_, c) => ret = c,
|
||||
WaitStatus::Signaled(_, sig, _) => ret = sig as i32,
|
||||
_ => {
|
||||
info!(logger, "got wrong status for process";
|
||||
"child-status" => format!("{:?}", wait_status));
|
||||
continue 'inner;
|
||||
}
|
||||
}
|
||||
|
||||
p.exit_code = ret;
|
||||
let _ = unistd::close(pipe_write);
|
||||
|
||||
info!(logger, "notify term to close");
|
||||
// close the socket file to notify readStdio to close terminal specifically
|
||||
// in case this process's terminal has been inherited by its children.
|
||||
p.notify_term_close();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// init_agent_as_init will do the initializations such as setting up the rootfs
|
||||
// when this agent has been run as the init process.
|
||||
fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {
|
||||
|
159
src/agent/src/signal.rs
Normal file
159
src/agent/src/signal.rs
Normal file
@ -0,0 +1,159 @@
|
||||
// Copyright (c) 2019-2020 Ant Financial
|
||||
// Copyright (c) 2020 Intel Corporation
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use crate::sandbox::Sandbox;
|
||||
use anyhow::{anyhow, Result};
|
||||
use nix::sys::wait::WaitPidFlag;
|
||||
use nix::sys::wait::{self, WaitStatus};
|
||||
use nix::unistd;
|
||||
use prctl::set_child_subreaper;
|
||||
use slog::{error, info, o, Logger};
|
||||
use std::sync::Arc;
|
||||
use tokio::select;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::sync::watch::Receiver;
|
||||
use tokio::sync::Mutex;
|
||||
use unistd::Pid;
|
||||
|
||||
async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result<()> {
|
||||
info!(logger, "handling signal"; "signal" => "SIGCHLD");
|
||||
|
||||
loop {
|
||||
let result = wait::waitpid(
|
||||
Some(Pid::from_raw(-1)),
|
||||
Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL),
|
||||
);
|
||||
|
||||
let wait_status = match result {
|
||||
Ok(s) => {
|
||||
if s == WaitStatus::StillAlive {
|
||||
return Ok(());
|
||||
}
|
||||
s
|
||||
}
|
||||
Err(e) => return Err(anyhow!(e).context("waitpid reaper failed")),
|
||||
};
|
||||
|
||||
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
|
||||
|
||||
if let Some(pid) = wait_status.pid() {
|
||||
let raw_pid = pid.as_raw();
|
||||
let child_pid = format!("{}", raw_pid);
|
||||
|
||||
let logger = logger.new(o!("child-pid" => child_pid));
|
||||
|
||||
let sandbox_ref = sandbox.clone();
|
||||
let mut sandbox = sandbox_ref.lock().await;
|
||||
|
||||
let process = sandbox.find_process(raw_pid);
|
||||
if process.is_none() {
|
||||
info!(logger, "child exited unexpectedly");
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut p = process.unwrap();
|
||||
|
||||
if p.exit_pipe_w.is_none() {
|
||||
info!(logger, "process exit pipe not set");
|
||||
continue;
|
||||
}
|
||||
|
||||
let pipe_write = p.exit_pipe_w.unwrap();
|
||||
let ret: i32;
|
||||
|
||||
match wait_status {
|
||||
WaitStatus::Exited(_, c) => ret = c,
|
||||
WaitStatus::Signaled(_, sig, _) => ret = sig as i32,
|
||||
_ => {
|
||||
info!(logger, "got wrong status for process";
|
||||
"child-status" => format!("{:?}", wait_status));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
p.exit_code = ret;
|
||||
let _ = unistd::close(pipe_write);
|
||||
|
||||
info!(logger, "notify term to close");
|
||||
// close the socket file to notify readStdio to close terminal specifically
|
||||
// in case this process's terminal has been inherited by its children.
|
||||
p.notify_term_close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_signal_handler(
|
||||
logger: Logger,
|
||||
sandbox: Arc<Mutex<Sandbox>>,
|
||||
mut shutdown: Receiver<bool>,
|
||||
) -> Result<()> {
|
||||
let logger = logger.new(o!("subsystem" => "signals"));
|
||||
|
||||
set_child_subreaper(true)
|
||||
.map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?;
|
||||
|
||||
let mut sigchild_stream = signal(SignalKind::child())?;
|
||||
|
||||
loop {
|
||||
select! {
|
||||
_ = shutdown.changed() => {
|
||||
info!(logger, "got shutdown request");
|
||||
break;
|
||||
}
|
||||
|
||||
_ = sigchild_stream.recv() => {
|
||||
let result = handle_sigchild(logger.clone(), sandbox.clone()).await;
|
||||
|
||||
match result {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
// Log errors, but don't abort - just wait for more signals!
|
||||
error!(logger, "failed to handle signal"; "error" => format!("{:?}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::pin;
|
||||
use tokio::sync::watch::channel;
|
||||
use tokio::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_setup_signal_handler() {
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
let s = Sandbox::new(&logger).unwrap();
|
||||
|
||||
let sandbox = Arc::new(Mutex::new(s));
|
||||
|
||||
let (tx, rx) = channel(true);
|
||||
|
||||
let handle = tokio::spawn(setup_signal_handler(logger, sandbox, rx));
|
||||
|
||||
let timeout = tokio::time::sleep(Duration::from_secs(1));
|
||||
pin!(timeout);
|
||||
|
||||
tx.send(true).expect("failed to request shutdown");
|
||||
|
||||
loop {
|
||||
select! {
|
||||
_ = handle => {
|
||||
println!("INFO: task completed");
|
||||
break;
|
||||
},
|
||||
_ = &mut timeout => {
|
||||
panic!("signal thread failed to stop");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user