diff --git a/src/agent/src/console.rs b/src/agent/src/console.rs new file mode 100644 index 000000000..97aa95d4e --- /dev/null +++ b/src/agent/src/console.rs @@ -0,0 +1,294 @@ +// Copyright (c) 2021 Ant Group +// Copyright (c) 2021 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +use crate::util; +use anyhow::{anyhow, Result}; +use nix::fcntl::{self, FcntlArg, FdFlag, OFlag}; +use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; +use nix::pty::{openpty, OpenptyResult}; +use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; +use nix::sys::stat::Mode; +use nix::sys::wait; +use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid}; +use rustjail::pipestream::PipeStream; +use slog::Logger; +use std::ffi::CString; +use std::os::unix::io::{FromRawFd, RawFd}; +use std::path::PathBuf; +use std::process::Stdio; +use std::sync::Arc; +use std::sync::Mutex as SyncMutex; + +use futures::StreamExt; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::select; +use tokio::sync::watch::Receiver; + +const CONSOLE_PATH: &str = "/dev/console"; + +lazy_static! { + static ref SHELLS: Arc>> = { + let mut v = Vec::new(); + + if !cfg!(test) { + v.push("/bin/bash".to_string()); + v.push("/bin/sh".to_string()); + } + + Arc::new(SyncMutex::new(v)) + }; +} + +pub fn initialize() { + lazy_static::initialize(&SHELLS); +} + +pub async fn debug_console_handler( + logger: Logger, + port: u32, + mut shutdown: Receiver, +) -> Result<()> { + let logger = logger.new(o!("subsystem" => "debug-console")); + + let shells = SHELLS.lock().unwrap().to_vec(); + + let shell = shells + .into_iter() + .find(|sh| PathBuf::from(sh).exists()) + .ok_or_else(|| anyhow!("no shell found to launch debug console"))?; + + if port > 0 { + let listenfd = socket::socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::SOCK_CLOEXEC, + None, + )?; + let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port); + socket::bind(listenfd, &addr)?; + socket::listen(listenfd, 1)?; + + let mut incoming = util::get_vsock_incoming(listenfd); + + loop { + select! { + _ = shutdown.changed() => { + info!(logger, "debug console got shutdown request"); + break; + } + + conn = incoming.next() => { + if let Some(conn) = conn { + // Accept a new connection + match conn { + Ok(stream) => { + let logger = logger.clone(); + let shell = shell.clone(); + // Do not block(await) here, or we'll never receive the shutdown signal + tokio::spawn(async move { + let _ = run_debug_console_vsock(logger, shell, stream).await; + }); + } + Err(e) => { + error!(logger, "{:?}", e); + } + } + } else { + break; + } + } + } + } + } else { + let mut flags = OFlag::empty(); + flags.insert(OFlag::O_RDWR); + flags.insert(OFlag::O_CLOEXEC); + + let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; + + select! { + _ = shutdown.changed() => { + info!(logger, "debug console got shutdown request"); + } + + result = run_debug_console_serial(shell.clone(), fd) => { + match result { + Ok(_) => { + info!(logger, "run_debug_console_shell session finished"); + } + Err(err) => { + error!(logger, "run_debug_console_shell failed: {:?}", err); + } + } + } + } + }; + + Ok(()) +} + +fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> { + // create new session with child as session leader + setsid()?; + + // dup stdin, stdout, stderr to let child act as a terminal + dup2(slave_fd, STDIN_FILENO)?; + dup2(slave_fd, STDOUT_FILENO)?; + dup2(slave_fd, STDERR_FILENO)?; + + // set tty + unsafe { + libc::ioctl(0, libc::TIOCSCTTY); + } + + let cmd = CString::new(shell).unwrap(); + + // run shell + let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e { + nix::Error::Sys(errno) => { + std::process::exit(errno as i32); + } + _ => std::process::exit(-2), + }); + + Ok(()) +} + +async fn run_in_parent( + logger: Logger, + stream: T, + pseudo: OpenptyResult, + child_pid: Pid, +) -> Result<()> { + info!(logger, "get debug shell pid {:?}", child_pid); + + let master_fd = pseudo.master; + let _ = close(pseudo.slave); + + let (mut socket_reader, mut socket_writer) = tokio::io::split(stream); + let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd)); + + select! { + res = tokio::io::copy(&mut master_reader, &mut socket_writer) => { + debug!( + logger, + "master closed: {:?}", res + ); + } + res = tokio::io::copy(&mut socket_reader, &mut master_writer) => { + info!( + logger, + "socket closed: {:?}", res + ); + } + } + + let wait_status = wait::waitpid(child_pid, None); + info!(logger, "debug console process exit code: {:?}", wait_status); + + Ok(()) +} + +async fn run_debug_console_vsock( + logger: Logger, + shell: String, + stream: T, +) -> Result<()> { + let logger = logger.new(o!("subsystem" => "debug-console-shell")); + + let pseudo = openpty(None, None)?; + let _ = fcntl::fcntl(pseudo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); + let _ = fcntl::fcntl(pseudo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); + + let slave_fd = pseudo.slave; + + match fork() { + Ok(ForkResult::Child) => run_in_child(slave_fd, shell), + Ok(ForkResult::Parent { child: child_pid }) => { + run_in_parent(logger.clone(), stream, pseudo, child_pid).await + } + Err(err) => Err(anyhow!("fork error: {:?}", err)), + } +} + +async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> { + let mut child = match tokio::process::Command::new(shell) + .arg("-i") + .kill_on_drop(true) + .stdin(unsafe { Stdio::from_raw_fd(fd) }) + .stdout(unsafe { Stdio::from_raw_fd(fd) }) + .stderr(unsafe { Stdio::from_raw_fd(fd) }) + .spawn() + { + Ok(c) => c, + Err(_) => return Err(anyhow!("failed to spawn shell")), + }; + + child.wait().await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + use tokio::sync::watch; + + #[tokio::test] + async fn test_setup_debug_console_no_shells() { + { + // Guarantee no shells have been added + // (required to avoid racing with + // test_setup_debug_console_invalid_shell()). + let shells_ref = SHELLS.clone(); + let mut shells = shells_ref.lock().unwrap(); + shells.clear(); + } + + let logger = slog_scope::logger(); + + let (_, rx) = watch::channel(true); + let result = debug_console_handler(logger, 0, rx).await; + + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "no shell found to launch debug console" + ); + } + + #[tokio::test] + async fn test_setup_debug_console_invalid_shell() { + { + let shells_ref = SHELLS.clone(); + let mut shells = shells_ref.lock().unwrap(); + + let dir = tempdir().expect("failed to create tmpdir"); + + // Add an invalid shell + let shell = dir + .path() + .join("enoent") + .to_str() + .expect("failed to construct shell path") + .to_string(); + + shells.push(shell); + } + + let logger = slog_scope::logger(); + + let (_, rx) = watch::channel(true); + let result = debug_console_handler(logger, 0, rx).await; + + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "no shell found to launch debug console" + ); + } +} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index d546ab863..595951bc5 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -20,26 +20,21 @@ extern crate scopeguard; extern crate slog; use anyhow::{anyhow, Context, Result}; -use nix::fcntl::{self, OFlag}; -use nix::fcntl::{FcntlArg, FdFlag}; -use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; -use nix::pty; -use nix::sys::select::{select, FdSet}; +use nix::fcntl::OFlag; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; -use nix::sys::wait; -use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult}; +use nix::unistd::{self, dup, Pid}; use std::env; -use std::ffi::{CStr, CString, OsStr}; +use std::ffi::OsStr; use std::fs::{self, File}; -use std::io::{Read, Write}; use std::os::unix::ffi::OsStrExt; use std::os::unix::fs as unixfs; use std::os::unix::io::AsRawFd; use std::path::Path; +use std::process::exit; use std::sync::Arc; -use unistd::Pid; mod config; +mod console; mod device; mod linux_abi; mod metrics; @@ -63,10 +58,7 @@ use signal::setup_signal_handler; use slog::Logger; use uevent::watch_uevents; -use std::sync::Mutex as SyncMutex; - use futures::future::join_all; -use futures::StreamExt as _; use rustjail::pipestream::PipeStream; use tokio::{ io::AsyncWrite, @@ -76,15 +68,11 @@ use tokio::{ }, task::JoinHandle, }; -use tokio_vsock::{Incoming, VsockListener, VsockStream}; mod rpc; const NAME: &str = "kata-agent"; const KERNEL_CMDLINE_FILE: &str = "/proc/cmdline"; -const CONSOLE_PATH: &str = "/dev/console"; - -const DEFAULT_BUF_SIZE: usize = 8 * 1024; lazy_static! { static ref AGENT_CONFIG: Arc> = @@ -104,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) { ); } -fn set_fd_close_exec(fd: RawFd) -> Result { - if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) { - return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e)); - } - Ok(fd) -} - -fn get_vsock_incoming(fd: RawFd) -> Incoming { - let incoming; - unsafe { - incoming = VsockListener::from_raw_fd(fd).incoming(); - } - incoming -} - -async fn get_vsock_stream(fd: RawFd) -> Result { - let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap(); - set_fd_close_exec(stream.as_raw_fd())?; - Ok(stream) -} - // Create a thread to handle reading from the logger pipe. The thread will // output to the vsock port specified, or stdout. async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver) -> Result<()> { @@ -143,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver std::result::Result<(), Box> { // List of tasks that need to be stopped for a clean shutdown let mut tasks: Vec>> = vec![]; - lazy_static::initialize(&SHELLS); + console::initialize(); lazy_static::initialize(&AGENT_CONFIG); @@ -299,26 +266,17 @@ async fn start_sandbox( tasks: &mut Vec>>, shutdown: Receiver, ) -> Result<()> { - let shells = SHELLS.clone(); let debug_console_vport = config.debug_console_vport as u32; - let shell_handle = if config.debug_console { - let thread_logger = logger.clone(); - let shells = shells.lock().unwrap().to_vec(); + if config.debug_console { + let debug_console_task = tokio::task::spawn(console::debug_console_handler( + logger.clone(), + debug_console_vport, + shutdown.clone(), + )); - let handle = tokio::task::spawn_blocking(move || { - let result = setup_debug_console(&thread_logger, shells, debug_console_vport); - if result.is_err() { - // Report error, but don't fail - warn!(thread_logger, "failed to setup debug console"; - "error" => format!("{}", result.unwrap_err())); - } - }); - - Some(handle) - } else { - None - }; + tasks.push(debug_console_task); + } // Initialize unique sandbox structure. let s = Sandbox::new(&logger).context("Failed to create sandbox")?; @@ -350,10 +308,6 @@ async fn start_sandbox( let _ = rx.await?; server.shutdown().await?; - if let Some(handle) = shell_handle { - handle.await.map_err(|e| anyhow!("{:?}", e))?; - } - Ok(()) } @@ -404,284 +358,5 @@ fn sethostname(hostname: &OsStr) -> Result<()> { } } -lazy_static! { - static ref SHELLS: Arc>> = { - let mut v = Vec::new(); - - if !cfg!(test) { - v.push("/bin/bash".to_string()); - v.push("/bin/sh".to_string()); - } - - Arc::new(SyncMutex::new(v)) - }; -} - use crate::config::AgentConfig; -use nix::sys::stat::Mode; use std::os::unix::io::{FromRawFd, RawFd}; -use std::path::PathBuf; -use std::process::exit; - -fn setup_debug_console(logger: &Logger, shells: Vec, port: u32) -> Result<()> { - let shell = shells - .iter() - .find(|sh| PathBuf::from(sh).exists()) - .ok_or_else(|| anyhow!("no shell found to launch debug console"))?; - - if port > 0 { - let listenfd = socket::socket( - AddressFamily::Vsock, - SockType::Stream, - SockFlag::SOCK_CLOEXEC, - None, - )?; - let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port); - socket::bind(listenfd, &addr)?; - socket::listen(listenfd, 1)?; - loop { - let f: RawFd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?; - match run_debug_console_shell(logger, shell, f) { - Ok(_) => { - info!(logger, "run_debug_console_shell session finished"); - } - Err(err) => { - error!(logger, "run_debug_console_shell failed: {:?}", err); - } - } - } - } else { - let mut flags = OFlag::empty(); - flags.insert(OFlag::O_RDWR); - flags.insert(OFlag::O_CLOEXEC); - loop { - let f: RawFd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; - match run_debug_console_shell(logger, shell, f) { - Ok(_) => { - info!(logger, "run_debug_console_shell session finished"); - } - Err(err) => { - error!(logger, "run_debug_console_shell failed: {:?}", err); - } - } - } - }; -} - -fn io_copy(reader: &mut R, writer: &mut W) -> std::io::Result -where - R: Read, - W: Write, -{ - let mut buf = [0; DEFAULT_BUF_SIZE]; - let buf_len; - - match reader.read(&mut buf) { - Ok(0) => return Ok(0), - Ok(len) => buf_len = len, - Err(err) => return Err(err), - }; - - // write and return - match writer.write_all(&buf[..buf_len]) { - Ok(_) => Ok(buf_len as u64), - Err(err) => Err(err), - } -} - -fn run_debug_console_shell(logger: &Logger, shell: &str, socket_fd: RawFd) -> Result<()> { - let pseduo = pty::openpty(None, None)?; - let _ = fcntl::fcntl(pseduo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); - let _ = fcntl::fcntl(pseduo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); - - let slave_fd = pseduo.slave; - - match fork() { - Ok(ForkResult::Child) => { - // create new session with child as session leader - setsid()?; - - // dup stdin, stdout, stderr to let child act as a terminal - dup2(slave_fd, STDIN_FILENO)?; - dup2(slave_fd, STDOUT_FILENO)?; - dup2(slave_fd, STDERR_FILENO)?; - - // set tty - unsafe { - libc::ioctl(0, libc::TIOCSCTTY); - } - - let cmd = CString::new(shell).unwrap(); - let args: Vec<&CStr> = vec![]; - - // run shell - let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e { - nix::Error::Sys(errno) => { - std::process::exit(errno as i32); - } - _ => std::process::exit(-2), - }); - } - - Ok(ForkResult::Parent { child: child_pid }) => { - info!(logger, "get debug shell pid {:?}", child_pid); - - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; - let master_fd = pseduo.master; - let debug_shell_logger = logger.clone(); - - // channel that used to sync between thread and main process - let (tx, rx) = std::sync::mpsc::channel::(); - - // start a thread to do IO copy between socket and pseduo.master - std::thread::spawn(move || { - let mut master_reader = unsafe { File::from_raw_fd(master_fd) }; - let mut master_writer = unsafe { File::from_raw_fd(master_fd) }; - let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) }; - let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) }; - - loop { - let mut fd_set = FdSet::new(); - fd_set.insert(rfd); - fd_set.insert(master_fd); - fd_set.insert(socket_fd); - - match select( - Some(fd_set.highest().unwrap() + 1), - &mut fd_set, - None, - None, - None, - ) { - Ok(_) => (), - Err(e) => { - if e == nix::Error::from(nix::errno::Errno::EINTR) { - continue; - } else { - error!(debug_shell_logger, "select error {:?}", e); - tx.send(1).unwrap(); - break; - } - } - } - - if fd_set.contains(rfd) { - info!( - debug_shell_logger, - "debug shell process {} exited", child_pid - ); - tx.send(1).unwrap(); - break; - } - - if fd_set.contains(master_fd) { - match io_copy(&mut master_reader, &mut socket_writer) { - Ok(0) => { - debug!(debug_shell_logger, "master fd closed"); - tx.send(1).unwrap(); - break; - } - Ok(_) => {} - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - Err(e) => { - error!(debug_shell_logger, "read master fd error {:?}", e); - tx.send(1).unwrap(); - break; - } - } - } - - if fd_set.contains(socket_fd) { - match io_copy(&mut socket_reader, &mut master_writer) { - Ok(0) => { - debug!(debug_shell_logger, "socket fd closed"); - tx.send(1).unwrap(); - break; - } - Ok(_) => {} - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - Err(e) => { - error!(debug_shell_logger, "read socket fd error {:?}", e); - tx.send(1).unwrap(); - break; - } - } - } - } - }); - - let wait_status = wait::waitpid(child_pid, None); - info!(logger, "debug console process exit code: {:?}", wait_status); - - info!(logger, "notify debug monitor thread to exit"); - // close pipe to exit select loop - let _ = close(wfd); - - // wait for thread exit. - let _ = rx.recv().unwrap(); - info!(logger, "debug monitor thread has exited"); - - // close files - let _ = close(rfd); - let _ = close(master_fd); - let _ = close(slave_fd); - } - Err(err) => { - return Err(anyhow!("fork error: {:?}", err)); - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::tempdir; - - #[test] - fn test_setup_debug_console_no_shells() { - // Guarantee no shells have been added - // (required to avoid racing with - // test_setup_debug_console_invalid_shell()). - let shells_ref = SHELLS.clone(); - let mut shells = shells_ref.lock().unwrap(); - shells.clear(); - let logger = slog_scope::logger(); - - let result = setup_debug_console(&logger, shells.to_vec(), 0); - - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "no shell found to launch debug console" - ); - } - - #[test] - fn test_setup_debug_console_invalid_shell() { - let shells_ref = SHELLS.clone(); - let mut shells = shells_ref.lock().unwrap(); - - let dir = tempdir().expect("failed to create tmpdir"); - - // Add an invalid shell - let shell = dir - .path() - .join("enoent") - .to_str() - .expect("failed to construct shell path") - .to_string(); - - shells.push(shell); - let logger = slog_scope::logger(); - - let result = setup_debug_console(&logger, shells.to_vec(), 0); - - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "no shell found to launch debug console" - ); - } -} diff --git a/src/agent/src/util.rs b/src/agent/src/util.rs index 314d05a25..82877f8ba 100644 --- a/src/agent/src/util.rs +++ b/src/agent/src/util.rs @@ -3,10 +3,14 @@ // SPDX-License-Identifier: Apache-2.0 // +use anyhow::Result; +use futures::StreamExt; use std::io; use std::io::ErrorKind; +use std::os::unix::io::{FromRawFd, RawFd}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::watch::Receiver; +use tokio_vsock::{Incoming, VsockListener, VsockStream}; // Size of I/O read buffer const BUF_SIZE: usize = 8192; @@ -52,6 +56,15 @@ where Ok(total_bytes) } +pub fn get_vsock_incoming(fd: RawFd) -> Incoming { + unsafe { VsockListener::from_raw_fd(fd).incoming() } +} + +pub async fn get_vsock_stream(fd: RawFd) -> Result { + let stream = get_vsock_incoming(fd).next().await.unwrap()?; + Ok(stream) +} + #[cfg(test)] mod tests { use super::*;