mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-05-01 13:14:33 +00:00
signal: Move to a new module
Move the signal handling code into a new module and refactor into the main handler and a new SIGCHLD handling function to make the code simpler and easier to understand. Also added a unit test for shutdown. Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
parent
011f7d785a
commit
d8d5b4cd1d
@ -26,9 +26,8 @@ use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
|
|||||||
use nix::pty;
|
use nix::pty;
|
||||||
use nix::sys::select::{select, FdSet};
|
use nix::sys::select::{select, FdSet};
|
||||||
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
||||||
use nix::sys::wait::{self, WaitStatus};
|
use nix::sys::wait;
|
||||||
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
|
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
|
||||||
use prctl::set_child_subreaper;
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::ffi::{CStr, CString, OsStr};
|
use std::ffi::{CStr, CString, OsStr};
|
||||||
@ -52,6 +51,7 @@ mod network;
|
|||||||
mod pci;
|
mod pci;
|
||||||
pub mod random;
|
pub mod random;
|
||||||
mod sandbox;
|
mod sandbox;
|
||||||
|
mod signal;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test_utils;
|
mod test_utils;
|
||||||
mod uevent;
|
mod uevent;
|
||||||
@ -60,6 +60,7 @@ mod version;
|
|||||||
|
|
||||||
use mount::{cgroups_mount, general_mount};
|
use mount::{cgroups_mount, general_mount};
|
||||||
use sandbox::Sandbox;
|
use sandbox::Sandbox;
|
||||||
|
use signal::setup_signal_handler;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
use uevent::watch_uevents;
|
use uevent::watch_uevents;
|
||||||
|
|
||||||
@ -70,7 +71,6 @@ use futures::StreamExt as _;
|
|||||||
use rustjail::pipestream::PipeStream;
|
use rustjail::pipestream::PipeStream;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::AsyncWrite,
|
io::AsyncWrite,
|
||||||
signal::unix::{signal, SignalKind},
|
|
||||||
sync::{
|
sync::{
|
||||||
oneshot::Sender,
|
oneshot::Sender,
|
||||||
watch::{channel, Receiver},
|
watch::{channel, Receiver},
|
||||||
@ -237,7 +237,7 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the sandbox and wait for its ttRPC server to end
|
// Start the sandbox and wait for its ttRPC server to end
|
||||||
start_sandbox(&logger, &config, init_mode).await?;
|
start_sandbox(&logger, &config, init_mode, &mut tasks, shutdown_rx.clone()).await?;
|
||||||
|
|
||||||
// Install a NOP logger for the remainder of the shutdown sequence
|
// Install a NOP logger for the remainder of the shutdown sequence
|
||||||
// to ensure any log calls made by local crates using the scope logger
|
// to ensure any log calls made by local crates using the scope logger
|
||||||
@ -296,7 +296,13 @@ fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
rt.block_on(real_main())
|
rt.block_on(real_main())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -> Result<()> {
|
async fn start_sandbox(
|
||||||
|
logger: &Logger,
|
||||||
|
config: &AgentConfig,
|
||||||
|
init_mode: bool,
|
||||||
|
tasks: &mut Vec<JoinHandle<Result<()>>>,
|
||||||
|
shutdown: Receiver<bool>,
|
||||||
|
) -> Result<()> {
|
||||||
let shells = SHELLS.clone();
|
let shells = SHELLS.clone();
|
||||||
let debug_console_vport = config.debug_console_vport as u32;
|
let debug_console_vport = config.debug_console_vport as u32;
|
||||||
|
|
||||||
@ -326,9 +332,14 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -
|
|||||||
|
|
||||||
let sandbox = Arc::new(Mutex::new(s));
|
let sandbox = Arc::new(Mutex::new(s));
|
||||||
|
|
||||||
setup_signal_handler(&logger, sandbox.clone())
|
let signal_handler_task = tokio::spawn(setup_signal_handler(
|
||||||
.await
|
logger.clone(),
|
||||||
.unwrap();
|
sandbox.clone(),
|
||||||
|
shutdown.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
tasks.push(signal_handler_task);
|
||||||
|
|
||||||
watch_uevents(sandbox.clone()).await;
|
watch_uevents(sandbox.clone()).await;
|
||||||
|
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
@ -348,93 +359,6 @@ async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
use nix::sys::wait::WaitPidFlag;
|
|
||||||
|
|
||||||
async fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result<()> {
|
|
||||||
let logger = logger.new(o!("subsystem" => "signals"));
|
|
||||||
|
|
||||||
set_child_subreaper(true)
|
|
||||||
.map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?;
|
|
||||||
|
|
||||||
let mut signal_stream = signal(SignalKind::child())?;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
'outer: loop {
|
|
||||||
signal_stream.recv().await;
|
|
||||||
info!(logger, "received signal"; "signal" => "SIGCHLD");
|
|
||||||
|
|
||||||
// sevral signals can be combined together
|
|
||||||
// as one. So loop around to reap all
|
|
||||||
// exited children
|
|
||||||
'inner: loop {
|
|
||||||
let wait_status = match wait::waitpid(
|
|
||||||
Some(Pid::from_raw(-1)),
|
|
||||||
Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL),
|
|
||||||
) {
|
|
||||||
Ok(s) => {
|
|
||||||
if s == WaitStatus::StillAlive {
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
s
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
info!(
|
|
||||||
logger,
|
|
||||||
"waitpid reaper failed";
|
|
||||||
"error" => e.as_errno().unwrap().desc()
|
|
||||||
);
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
|
|
||||||
|
|
||||||
let pid = wait_status.pid();
|
|
||||||
if let Some(pid) = pid {
|
|
||||||
let raw_pid = pid.as_raw();
|
|
||||||
let child_pid = format!("{}", raw_pid);
|
|
||||||
|
|
||||||
let logger = logger.new(o!("child-pid" => child_pid));
|
|
||||||
|
|
||||||
let mut sandbox = sandbox.lock().await;
|
|
||||||
let process = sandbox.find_process(raw_pid);
|
|
||||||
if process.is_none() {
|
|
||||||
info!(logger, "child exited unexpectedly");
|
|
||||||
continue 'inner;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut p = process.unwrap();
|
|
||||||
|
|
||||||
if p.exit_pipe_w.is_none() {
|
|
||||||
error!(logger, "the process's exit_pipe_w isn't set");
|
|
||||||
continue 'inner;
|
|
||||||
}
|
|
||||||
let pipe_write = p.exit_pipe_w.unwrap();
|
|
||||||
let ret: i32;
|
|
||||||
|
|
||||||
match wait_status {
|
|
||||||
WaitStatus::Exited(_, c) => ret = c,
|
|
||||||
WaitStatus::Signaled(_, sig, _) => ret = sig as i32,
|
|
||||||
_ => {
|
|
||||||
info!(logger, "got wrong status for process";
|
|
||||||
"child-status" => format!("{:?}", wait_status));
|
|
||||||
continue 'inner;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
p.exit_code = ret;
|
|
||||||
let _ = unistd::close(pipe_write);
|
|
||||||
|
|
||||||
info!(logger, "notify term to close");
|
|
||||||
// close the socket file to notify readStdio to close terminal specifically
|
|
||||||
// in case this process's terminal has been inherited by its children.
|
|
||||||
p.notify_term_close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// init_agent_as_init will do the initializations such as setting up the rootfs
|
// init_agent_as_init will do the initializations such as setting up the rootfs
|
||||||
// when this agent has been run as the init process.
|
// when this agent has been run as the init process.
|
||||||
fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {
|
fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {
|
||||||
|
159
src/agent/src/signal.rs
Normal file
159
src/agent/src/signal.rs
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
// Copyright (c) 2019-2020 Ant Financial
|
||||||
|
// Copyright (c) 2020 Intel Corporation
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
//
|
||||||
|
|
||||||
|
use crate::sandbox::Sandbox;
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use nix::sys::wait::WaitPidFlag;
|
||||||
|
use nix::sys::wait::{self, WaitStatus};
|
||||||
|
use nix::unistd;
|
||||||
|
use prctl::set_child_subreaper;
|
||||||
|
use slog::{error, info, o, Logger};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
|
use tokio::sync::watch::Receiver;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use unistd::Pid;
|
||||||
|
|
||||||
|
async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result<()> {
|
||||||
|
info!(logger, "handling signal"; "signal" => "SIGCHLD");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let result = wait::waitpid(
|
||||||
|
Some(Pid::from_raw(-1)),
|
||||||
|
Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL),
|
||||||
|
);
|
||||||
|
|
||||||
|
let wait_status = match result {
|
||||||
|
Ok(s) => {
|
||||||
|
if s == WaitStatus::StillAlive {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
s
|
||||||
|
}
|
||||||
|
Err(e) => return Err(anyhow!(e).context("waitpid reaper failed")),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
|
||||||
|
|
||||||
|
if let Some(pid) = wait_status.pid() {
|
||||||
|
let raw_pid = pid.as_raw();
|
||||||
|
let child_pid = format!("{}", raw_pid);
|
||||||
|
|
||||||
|
let logger = logger.new(o!("child-pid" => child_pid));
|
||||||
|
|
||||||
|
let sandbox_ref = sandbox.clone();
|
||||||
|
let mut sandbox = sandbox_ref.lock().await;
|
||||||
|
|
||||||
|
let process = sandbox.find_process(raw_pid);
|
||||||
|
if process.is_none() {
|
||||||
|
info!(logger, "child exited unexpectedly");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut p = process.unwrap();
|
||||||
|
|
||||||
|
if p.exit_pipe_w.is_none() {
|
||||||
|
info!(logger, "process exit pipe not set");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let pipe_write = p.exit_pipe_w.unwrap();
|
||||||
|
let ret: i32;
|
||||||
|
|
||||||
|
match wait_status {
|
||||||
|
WaitStatus::Exited(_, c) => ret = c,
|
||||||
|
WaitStatus::Signaled(_, sig, _) => ret = sig as i32,
|
||||||
|
_ => {
|
||||||
|
info!(logger, "got wrong status for process";
|
||||||
|
"child-status" => format!("{:?}", wait_status));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.exit_code = ret;
|
||||||
|
let _ = unistd::close(pipe_write);
|
||||||
|
|
||||||
|
info!(logger, "notify term to close");
|
||||||
|
// close the socket file to notify readStdio to close terminal specifically
|
||||||
|
// in case this process's terminal has been inherited by its children.
|
||||||
|
p.notify_term_close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn setup_signal_handler(
|
||||||
|
logger: Logger,
|
||||||
|
sandbox: Arc<Mutex<Sandbox>>,
|
||||||
|
mut shutdown: Receiver<bool>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let logger = logger.new(o!("subsystem" => "signals"));
|
||||||
|
|
||||||
|
set_child_subreaper(true)
|
||||||
|
.map_err(|err| anyhow!(err).context("failed to setup agent as a child subreaper"))?;
|
||||||
|
|
||||||
|
let mut sigchild_stream = signal(SignalKind::child())?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = shutdown.changed() => {
|
||||||
|
info!(logger, "got shutdown request");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = sigchild_stream.recv() => {
|
||||||
|
let result = handle_sigchild(logger.clone(), sandbox.clone()).await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(()) => (),
|
||||||
|
Err(e) => {
|
||||||
|
// Log errors, but don't abort - just wait for more signals!
|
||||||
|
error!(logger, "failed to handle signal"; "error" => format!("{:?}", e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use tokio::pin;
|
||||||
|
use tokio::sync::watch::channel;
|
||||||
|
use tokio::time::Duration;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_setup_signal_handler() {
|
||||||
|
let logger = slog::Logger::root(slog::Discard, o!());
|
||||||
|
let s = Sandbox::new(&logger).unwrap();
|
||||||
|
|
||||||
|
let sandbox = Arc::new(Mutex::new(s));
|
||||||
|
|
||||||
|
let (tx, rx) = channel(true);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(setup_signal_handler(logger, sandbox, rx));
|
||||||
|
|
||||||
|
let timeout = tokio::time::sleep(Duration::from_secs(1));
|
||||||
|
pin!(timeout);
|
||||||
|
|
||||||
|
tx.send(true).expect("failed to request shutdown");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = handle => {
|
||||||
|
println!("INFO: task completed");
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
_ = &mut timeout => {
|
||||||
|
panic!("signal thread failed to stop");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user