Files
kata-containers/src/agent/src/main.rs
fupan.lfp a05e137710 agent: re-enable the standard SIGPIPE behavior
The Rust standard library had suppressed the default SIGPIPE
behavior, see https://github.com/rust-lang/rust/pull/13158.
Since the parent's signal handler would be inherited by it's child
process, thus we should re-enable the standard SIGPIPE behavior as a
workaround.

Fixes: #1887

Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
(cherry picked from commit 0ae364c8eb)
Signed-off-by: Fabiano Fidêncio <fidencio@redhat.com>
2021-05-31 16:56:15 +02:00

375 lines
10 KiB
Rust

// Copyright (c) 2019 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate lazy_static;
extern crate oci;
extern crate prctl;
extern crate prometheus;
extern crate protocols;
extern crate regex;
extern crate scan_fmt;
extern crate serde_json;
#[macro_use]
extern crate scopeguard;
#[macro_use]
extern crate slog;
use anyhow::{anyhow, Context, Result};
use nix::fcntl::OFlag;
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::unistd::{self, dup, Pid};
use std::env;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs as unixfs;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::process::exit;
use std::sync::Arc;
mod config;
mod console;
mod device;
mod linux_abi;
mod metrics;
mod mount;
mod namespace;
mod netlink;
mod network;
mod pci;
pub mod random;
mod sandbox;
mod signal;
#[cfg(test)]
mod test_utils;
mod uevent;
mod util;
mod version;
use mount::{cgroups_mount, general_mount};
use sandbox::Sandbox;
use signal::setup_signal_handler;
use slog::Logger;
use uevent::watch_uevents;
use futures::future::join_all;
use rustjail::pipestream::PipeStream;
use tokio::{
io::AsyncWrite,
sync::{
watch::{channel, Receiver},
Mutex, RwLock,
},
task::JoinHandle,
};
mod rpc;
const NAME: &str = "kata-agent";
const KERNEL_CMDLINE_FILE: &str = "/proc/cmdline";
lazy_static! {
static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> =
Arc::new(RwLock::new(config::AgentConfig::new()));
}
fn announce(logger: &Logger, config: &AgentConfig) {
info!(logger, "announce";
"agent-commit" => version::VERSION_COMMIT,
// Avoid any possibility of confusion with the old agent
"agent-type" => "rust",
"agent-version" => version::AGENT_VERSION,
"api-version" => version::API_VERSION,
"config" => format!("{:?}", config),
);
}
// Create a thread to handle reading from the logger pipe. The thread will
// output to the vsock port specified, or stdout.
async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool>) -> Result<()> {
let mut reader = PipeStream::from_fd(rfd);
let mut writer: Box<dyn AsyncWrite + Unpin + Send>;
if vsock_port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)?;
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, vsock_port);
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
writer = Box::new(util::get_vsock_stream(listenfd).await.unwrap());
} else {
writer = Box::new(tokio::io::stdout());
}
let _ = util::interruptable_io_copier(&mut reader, &mut writer, shutdown).await;
Ok(())
}
async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
env::set_var("RUST_BACKTRACE", "full");
// List of tasks that need to be stopped for a clean shutdown
let mut tasks: Vec<JoinHandle<Result<()>>> = vec![];
console::initialize();
lazy_static::initialize(&AGENT_CONFIG);
// support vsock log
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let (shutdown_tx, shutdown_rx) = channel(true);
let agent_config = AGENT_CONFIG.clone();
let init_mode = unistd::getpid() == Pid::from_raw(1);
if init_mode {
// dup a new file descriptor for this temporary logger writer,
// since this logger would be dropped and it's writer would
// be closed out of this code block.
let newwfd = dup(wfd)?;
let writer = unsafe { File::from_raw_fd(newwfd) };
// Init a temporary logger used by init agent as init process
// since before do the base mount, it wouldn't access "/proc/cmdline"
// to get the customzied debug level.
let (logger, logger_async_guard) =
logging::create_logger(NAME, "agent", slog::Level::Debug, writer);
// Must mount proc fs before parsing kernel command line
general_mount(&logger).map_err(|e| {
error!(logger, "fail general mount: {}", e);
e
})?;
let mut config = agent_config.write().await;
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?;
drop(logger_async_guard);
} else {
// once parsed cmdline and set the config, release the write lock
// as soon as possible in case other thread would get read lock on
// it.
let mut config = agent_config.write().await;
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
}
let config = agent_config.read().await;
let log_vport = config.log_vport as u32;
let log_handle = tokio::spawn(create_logger_task(rfd, log_vport, shutdown_rx.clone()));
tasks.push(log_handle);
let writer = unsafe { File::from_raw_fd(wfd) };
// Recreate a logger with the log level get from "/proc/cmdline".
let (logger, logger_async_guard) =
logging::create_logger(NAME, "agent", config.log_level, writer);
announce(&logger, &config);
// This variable is required as it enables the global (and crucially static) logger,
// which is required to satisfy the the lifetime constraints of the auto-generated gRPC code.
let global_logger = slog_scope::set_global_logger(logger.new(o!("subsystem" => "rpc")));
// Allow the global logger to be modified later (for shutdown)
global_logger.cancel_reset();
let mut ttrpc_log_guard: Result<(), log::SetLoggerError> = Ok(());
if config.log_level == slog::Level::Trace {
// Redirect ttrpc log calls to slog iff full debug requested
ttrpc_log_guard = Ok(slog_stdlog::init().map_err(|e| e)?);
}
// Start the sandbox and wait for its ttRPC server to end
start_sandbox(&logger, &config, init_mode, &mut tasks, shutdown_rx.clone()).await?;
// Install a NOP logger for the remainder of the shutdown sequence
// to ensure any log calls made by local crates using the scope logger
// don't fail.
let global_logger_guard2 =
slog_scope::set_global_logger(slog::Logger::root(slog::Discard, o!()));
global_logger_guard2.cancel_reset();
drop(logger_async_guard);
drop(ttrpc_log_guard);
// Trigger a controlled shutdown
shutdown_tx
.send(true)
.map_err(|e| anyhow!(e).context("failed to request shutdown"))?;
// Wait for all threads to finish
let results = join_all(tasks).await;
for result in results {
if let Err(e) = result {
return Err(anyhow!(e).into());
}
}
eprintln!("{} shutdown complete", NAME);
Ok(())
}
fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() == 2 && args[1] == "--version" {
println!(
"{} version {} (api version: {}, commit version: {}, type: rust)",
NAME,
version::AGENT_VERSION,
version::API_VERSION,
version::VERSION_COMMIT,
);
exit(0);
}
if args.len() == 2 && args[1] == "init" {
reset_sigpipe();
rustjail::container::init_child();
exit(0);
}
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(real_main())
}
async fn start_sandbox(
logger: &Logger,
config: &AgentConfig,
init_mode: bool,
tasks: &mut Vec<JoinHandle<Result<()>>>,
shutdown: Receiver<bool>,
) -> Result<()> {
let debug_console_vport = config.debug_console_vport as u32;
if config.debug_console {
let debug_console_task = tokio::task::spawn(console::debug_console_handler(
logger.clone(),
debug_console_vport,
shutdown.clone(),
));
tasks.push(debug_console_task);
}
// Initialize unique sandbox structure.
let s = Sandbox::new(&logger).context("Failed to create sandbox")?;
if init_mode {
s.rtnl.handle_localhost().await?;
}
let sandbox = Arc::new(Mutex::new(s));
let signal_handler_task = tokio::spawn(setup_signal_handler(
logger.clone(),
sandbox.clone(),
shutdown.clone(),
));
tasks.push(signal_handler_task);
let uevents_handler_task = tokio::spawn(watch_uevents(sandbox.clone(), shutdown.clone()));
tasks.push(uevents_handler_task);
let (tx, rx) = tokio::sync::oneshot::channel();
sandbox.lock().await.sender = Some(tx);
// vsock:///dev/vsock, port
let mut server = rpc::start(sandbox.clone(), config.server_addr.as_str());
server.start().await?;
let _ = rx.await?;
server.shutdown().await?;
Ok(())
}
// init_agent_as_init will do the initializations such as setting up the rootfs
// when this agent has been run as the init process.
fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {
cgroups_mount(logger, unified_cgroup_hierarchy).map_err(|e| {
error!(
logger,
"fail cgroups mount, unified_cgroup_hierarchy {}: {}", unified_cgroup_hierarchy, e
);
e
})?;
fs::remove_file(Path::new("/dev/ptmx"))?;
unixfs::symlink(Path::new("/dev/pts/ptmx"), Path::new("/dev/ptmx"))?;
unistd::setsid()?;
unsafe {
libc::ioctl(std::io::stdin().as_raw_fd(), libc::TIOCSCTTY, 1);
}
env::set_var("PATH", "/bin:/sbin/:/usr/bin/:/usr/sbin/");
let contents =
std::fs::read_to_string("/etc/hostname").unwrap_or_else(|_| String::from("localhost"));
let contents_array: Vec<&str> = contents.split(' ').collect();
let hostname = contents_array[0].trim();
if sethostname(OsStr::new(hostname)).is_err() {
warn!(logger, "failed to set hostname");
}
Ok(())
}
fn sethostname(hostname: &OsStr) -> Result<()> {
let size = hostname.len() as usize;
let result =
unsafe { libc::sethostname(hostname.as_bytes().as_ptr() as *const libc::c_char, size) };
if result != 0 {
Err(anyhow!("failed to set hostname"))
} else {
Ok(())
}
}
// The Rust standard library had suppressed the default SIGPIPE behavior,
// see https://github.com/rust-lang/rust/pull/13158.
// Since the parent's signal handler would be inherited by it's child process,
// thus we should re-enable the standard SIGPIPE behavior as a workaround to
// fix the issue of https://github.com/kata-containers/kata-containers/issues/1887.
fn reset_sigpipe() {
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
}
}
use crate::config::AgentConfig;
use std::os::unix::io::{FromRawFd, RawFd};