From 9017e1100b6ec6374b91bd8cbd66ba9177b5398f Mon Sep 17 00:00:00 2001 From: "James O. D. Hunt" Date: Fri, 26 Mar 2021 17:35:06 +0000 Subject: [PATCH 1/3] agent: start to rework the debug console It's the first commit of the rework. Fixes: #1647 Signed-off-by: James O. D. Hunt --- src/agent/src/console.rs | 353 +++++++++++++++++++++++++++++++++++++++ src/agent/src/main.rs | 328 ++---------------------------------- 2 files changed, 367 insertions(+), 314 deletions(-) create mode 100644 src/agent/src/console.rs diff --git a/src/agent/src/console.rs b/src/agent/src/console.rs new file mode 100644 index 0000000000..a535836d5e --- /dev/null +++ b/src/agent/src/console.rs @@ -0,0 +1,353 @@ +// Copyright (c) 2019 Ant Financial +// Copyright (c) 2021 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +use anyhow::{anyhow, Result}; +use lazy_static; +use nix::fcntl::{self, OFlag}; +use nix::fcntl::{FcntlArg, FdFlag}; +use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; +use nix::pty::{openpty, OpenptyResult}; +use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; +use nix::sys::stat::Mode; +use nix::sys::wait; +use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid}; +use rustjail::pipestream::PipeStream; +use slog::Logger; +use std::ffi::{CStr, CString}; +use std::fs::File; +use std::io::{Read, Write}; +use std::os::unix::io::{FromRawFd, RawFd}; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex as SyncMutex; +use tokio::sync::watch::Receiver; +use tokio::{pin, select}; + +const CONSOLE_PATH: &str = "/dev/console"; +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +lazy_static! { + static ref SHELLS: Arc>> = { + let mut v = Vec::new(); + + if !cfg!(test) { + v.push("/bin/bash".to_string()); + v.push("/bin/sh".to_string()); + } + + Arc::new(SyncMutex::new(v)) + }; +} + +pub fn initialize() { + lazy_static::initialize(&SHELLS); +} + +pub async fn debug_console_handler( + logger: Logger, + port: u32, + mut shutdown: Receiver, +) -> Result<()> { + let logger = logger.new(o!("subsystem" => "debug-console")); + + let shells = SHELLS.clone(); + let shells = shells.lock().unwrap().to_vec(); + + let shell = shells + .iter() + .find(|sh| PathBuf::from(sh).exists()) + .ok_or_else(|| anyhow!("no shell found to launch debug console"))?; + + let fd: RawFd; + + if port > 0 { + let listenfd = socket::socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::SOCK_CLOEXEC, + None, + )?; + let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port); + socket::bind(listenfd, &addr)?; + socket::listen(listenfd, 1)?; + + fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?; + } else { + let mut flags = OFlag::empty(); + flags.insert(OFlag::O_RDWR); + flags.insert(OFlag::O_CLOEXEC); + + fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; + }; + + loop { + select! { + _ = shutdown.changed() => { + info!(logger, "got shutdown request"); + break; + } + + // BUG: FIXME: wait on parent. + //result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => { + // match result { + // Ok(_) => { + // info!(logger, "run_debug_console_shell session finished"); + // } + // Err(err) => { + // error!(logger, "run_debug_console_shell failed: {:?}", err); + // } + // } + //} + } + } + + Ok(()) +} + +fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { + // create new session with child as session leader + setsid()?; + + // dup stdin, stdout, stderr to let child act as a terminal + dup2(slave_fd, STDIN_FILENO)?; + dup2(slave_fd, STDOUT_FILENO)?; + dup2(slave_fd, STDERR_FILENO)?; + + // set tty + unsafe { + libc::ioctl(0, libc::TIOCSCTTY); + } + + let cmd = CString::new(shell).unwrap(); + let args: Vec<&CStr> = vec![]; + + // run shell + let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e { + nix::Error::Sys(errno) => { + std::process::exit(errno as i32); + } + _ => std::process::exit(-2), + }); + + Ok(()) +} + +async fn run_in_parent( + logger: Logger, + mut shutdown: Receiver, + socket_fd: RawFd, + pseudo: OpenptyResult, + child_pid: Pid, +) -> Result<()> { + info!(logger, "get debug shell pid {:?}", child_pid); + + let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; + let master_fd = pseudo.master; + let slave_fd = pseudo.slave; + //let debug_shell_logger = logger.clone(); + + let logger = logger.clone(); + + // channel that used to sync between thread and main process + let (tx, rx) = std::sync::mpsc::channel::(); + + // start a thread to do IO copy between socket and pseudo.master + //tokio::spawn(async move { + //let logger = logger.clone(); + //let mut shutdown = shutdown.clone(); + + //let mut master_reader = unsafe { File::from_raw_fd(master_fd) }; + //let mut master_writer = unsafe { File::from_raw_fd(master_fd) }; + //let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) }; + //let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) }; + + let mut pipe_reader = PipeStream::from_fd(rfd); + + //let mut pty_master_reader = PipeStream::from_fd(master_fd); + //let mut socket_reader = PipeStream::from_fd(socket_fd); + + //pin!(pipe_reader); + + // BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd + // (see commented out code below). + loop { + select! { + _ = shutdown.changed() => { + info!(logger, "got shutdown request"); + break; + }, + _ = pipe_reader => { + info!( + debug_shell_logger, + "debug shell process {} exited", child_pid + ); + tx.send(1).unwrap(); + }, + + } + } + + // if fd_set.contains(master_fd) { + // match io_copy(&mut master_reader, &mut socket_writer) { + // Ok(0) => { + // debug!(debug_shell_logger, "master fd closed"); + // tx.send(1).unwrap(); + // break; + // } + // Ok(_) => {} + // Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + // Err(e) => { + // error!(debug_shell_logger, "read master fd error {:?}", e); + // tx.send(1).unwrap(); + // break; + // } + // } + // } + + // if fd_set.contains(socket_fd) { + // match io_copy(&mut socket_reader, &mut master_writer) { + // Ok(0) => { + // debug!(debug_shell_logger, "socket fd closed"); + // tx.send(1).unwrap(); + // break; + // } + // Ok(_) => {} + // Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + // Err(e) => { + // error!(debug_shell_logger, "read socket fd error {:?}", e); + // tx.send(1).unwrap(); + // break; + // } + // } + // } + //} + //}) + //.await; + + let wait_status = wait::waitpid(child_pid, None); + info!(logger, "debug console process exit code: {:?}", wait_status); + + info!(logger, "notify debug monitor thread to exit"); + // close pipe to exit select loop + let _ = close(wfd); + + // wait for thread exit. + let _ = rx.recv().unwrap(); + info!(logger, "debug monitor thread has exited"); + + // close files + let _ = close(rfd); + let _ = close(master_fd); + let _ = close(slave_fd); + + Ok(()) +} + +async fn run_debug_console_shell( + logger: Logger, + shell: &str, + socket_fd: RawFd, + shutdown: Receiver, +) -> Result<()> { + let logger = logger.new(o!("subsystem" => "debug-console-shell")); + + let pseudo = openpty(None, None)?; + let _ = fcntl::fcntl(pseudo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); + let _ = fcntl::fcntl(pseudo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); + + let slave_fd = pseudo.slave; + + match fork() { + Ok(ForkResult::Child) => run_in_child(slave_fd, shell), + Ok(ForkResult::Parent { child: child_pid }) => { + run_in_parent( + logger.clone(), + shutdown.clone(), + socket_fd, + pseudo, + child_pid, + ) + .await + } + Err(err) => Err(anyhow!("fork error: {:?}", err)), + } +} + +// BUG: FIXME: +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_setup_debug_console_no_shells() { + // Guarantee no shells have been added + // (required to avoid racing with + // test_setup_debug_console_invalid_shell()). + let shells_ref = SHELLS.clone(); + let mut shells = shells_ref.lock().unwrap(); + shells.clear(); + let logger = slog_scope::logger(); + + let result = setup_debug_console(&logger, shells.to_vec(), 0); + + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "no shell found to launch debug console" + ); + } + + #[test] + fn test_setup_debug_console_invalid_shell() { + let shells_ref = SHELLS.clone(); + let mut shells = shells_ref.lock().unwrap(); + + let dir = tempdir().expect("failed to create tmpdir"); + + // Add an invalid shell + let shell = dir + .path() + .join("enoent") + .to_str() + .expect("failed to construct shell path") + .to_string(); + + shells.push(shell); + let logger = slog_scope::logger(); + + let result = setup_debug_console(&logger, shells.to_vec(), 0); + + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "no shell found to launch debug console" + ); + } +} + +// BUG: FIXME: should not be required as we can use the +// interruptable_io_copier(). But if it is still needed, move to utils.rs. +fn io_copy(reader: &mut R, writer: &mut W) -> std::io::Result +where + R: Read, + W: Write, +{ + let mut buf = [0; DEFAULT_BUF_SIZE]; + let buf_len; + + match reader.read(&mut buf) { + Ok(0) => return Ok(0), + Ok(len) => buf_len = len, + Err(err) => return Err(err), + }; + + // write and return + match writer.write_all(&buf[..buf_len]) { + Ok(_) => Ok(buf_len as u64), + Err(err) => Err(err), + } +} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index d546ab8635..fd6dcfe846 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -22,24 +22,21 @@ extern crate slog; use anyhow::{anyhow, Context, Result}; use nix::fcntl::{self, OFlag}; use nix::fcntl::{FcntlArg, FdFlag}; -use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; -use nix::pty; -use nix::sys::select::{select, FdSet}; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; -use nix::sys::wait; -use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult}; +use nix::unistd::{self, dup, Pid}; +use std::collections::HashMap; use std::env; -use std::ffi::{CStr, CString, OsStr}; +use std::ffi::OsStr; use std::fs::{self, File}; -use std::io::{Read, Write}; use std::os::unix::ffi::OsStrExt; use std::os::unix::fs as unixfs; use std::os::unix::io::AsRawFd; use std::path::Path; +use std::process::exit; use std::sync::Arc; -use unistd::Pid; mod config; +mod console; mod device; mod linux_abi; mod metrics; @@ -63,8 +60,6 @@ use signal::setup_signal_handler; use slog::Logger; use uevent::watch_uevents; -use std::sync::Mutex as SyncMutex; - use futures::future::join_all; use futures::StreamExt as _; use rustjail::pipestream::PipeStream; @@ -82,9 +77,6 @@ mod rpc; const NAME: &str = "kata-agent"; const KERNEL_CMDLINE_FILE: &str = "/proc/cmdline"; -const CONSOLE_PATH: &str = "/dev/console"; - -const DEFAULT_BUF_SIZE: usize = 8 * 1024; lazy_static! { static ref AGENT_CONFIG: Arc> = @@ -159,7 +151,7 @@ async fn real_main() -> std::result::Result<(), Box> { // List of tasks that need to be stopped for a clean shutdown let mut tasks: Vec>> = vec![]; - lazy_static::initialize(&SHELLS); + console::initialize(); lazy_static::initialize(&AGENT_CONFIG); @@ -299,26 +291,17 @@ async fn start_sandbox( tasks: &mut Vec>>, shutdown: Receiver, ) -> Result<()> { - let shells = SHELLS.clone(); let debug_console_vport = config.debug_console_vport as u32; - let shell_handle = if config.debug_console { - let thread_logger = logger.clone(); - let shells = shells.lock().unwrap().to_vec(); + if config.debug_console { + let debug_console_task = tokio::task::spawn(console::debug_console_handler( + logger.clone(), + debug_console_vport, + shutdown.clone(), + )); - let handle = tokio::task::spawn_blocking(move || { - let result = setup_debug_console(&thread_logger, shells, debug_console_vport); - if result.is_err() { - // Report error, but don't fail - warn!(thread_logger, "failed to setup debug console"; - "error" => format!("{}", result.unwrap_err())); - } - }); - - Some(handle) - } else { - None - }; + tasks.push(debug_console_task); + } // Initialize unique sandbox structure. let s = Sandbox::new(&logger).context("Failed to create sandbox")?; @@ -350,10 +333,6 @@ async fn start_sandbox( let _ = rx.await?; server.shutdown().await?; - if let Some(handle) = shell_handle { - handle.await.map_err(|e| anyhow!("{:?}", e))?; - } - Ok(()) } @@ -404,284 +383,5 @@ fn sethostname(hostname: &OsStr) -> Result<()> { } } -lazy_static! { - static ref SHELLS: Arc>> = { - let mut v = Vec::new(); - - if !cfg!(test) { - v.push("/bin/bash".to_string()); - v.push("/bin/sh".to_string()); - } - - Arc::new(SyncMutex::new(v)) - }; -} - use crate::config::AgentConfig; -use nix::sys::stat::Mode; use std::os::unix::io::{FromRawFd, RawFd}; -use std::path::PathBuf; -use std::process::exit; - -fn setup_debug_console(logger: &Logger, shells: Vec, port: u32) -> Result<()> { - let shell = shells - .iter() - .find(|sh| PathBuf::from(sh).exists()) - .ok_or_else(|| anyhow!("no shell found to launch debug console"))?; - - if port > 0 { - let listenfd = socket::socket( - AddressFamily::Vsock, - SockType::Stream, - SockFlag::SOCK_CLOEXEC, - None, - )?; - let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port); - socket::bind(listenfd, &addr)?; - socket::listen(listenfd, 1)?; - loop { - let f: RawFd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?; - match run_debug_console_shell(logger, shell, f) { - Ok(_) => { - info!(logger, "run_debug_console_shell session finished"); - } - Err(err) => { - error!(logger, "run_debug_console_shell failed: {:?}", err); - } - } - } - } else { - let mut flags = OFlag::empty(); - flags.insert(OFlag::O_RDWR); - flags.insert(OFlag::O_CLOEXEC); - loop { - let f: RawFd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; - match run_debug_console_shell(logger, shell, f) { - Ok(_) => { - info!(logger, "run_debug_console_shell session finished"); - } - Err(err) => { - error!(logger, "run_debug_console_shell failed: {:?}", err); - } - } - } - }; -} - -fn io_copy(reader: &mut R, writer: &mut W) -> std::io::Result -where - R: Read, - W: Write, -{ - let mut buf = [0; DEFAULT_BUF_SIZE]; - let buf_len; - - match reader.read(&mut buf) { - Ok(0) => return Ok(0), - Ok(len) => buf_len = len, - Err(err) => return Err(err), - }; - - // write and return - match writer.write_all(&buf[..buf_len]) { - Ok(_) => Ok(buf_len as u64), - Err(err) => Err(err), - } -} - -fn run_debug_console_shell(logger: &Logger, shell: &str, socket_fd: RawFd) -> Result<()> { - let pseduo = pty::openpty(None, None)?; - let _ = fcntl::fcntl(pseduo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); - let _ = fcntl::fcntl(pseduo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)); - - let slave_fd = pseduo.slave; - - match fork() { - Ok(ForkResult::Child) => { - // create new session with child as session leader - setsid()?; - - // dup stdin, stdout, stderr to let child act as a terminal - dup2(slave_fd, STDIN_FILENO)?; - dup2(slave_fd, STDOUT_FILENO)?; - dup2(slave_fd, STDERR_FILENO)?; - - // set tty - unsafe { - libc::ioctl(0, libc::TIOCSCTTY); - } - - let cmd = CString::new(shell).unwrap(); - let args: Vec<&CStr> = vec![]; - - // run shell - let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e { - nix::Error::Sys(errno) => { - std::process::exit(errno as i32); - } - _ => std::process::exit(-2), - }); - } - - Ok(ForkResult::Parent { child: child_pid }) => { - info!(logger, "get debug shell pid {:?}", child_pid); - - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; - let master_fd = pseduo.master; - let debug_shell_logger = logger.clone(); - - // channel that used to sync between thread and main process - let (tx, rx) = std::sync::mpsc::channel::(); - - // start a thread to do IO copy between socket and pseduo.master - std::thread::spawn(move || { - let mut master_reader = unsafe { File::from_raw_fd(master_fd) }; - let mut master_writer = unsafe { File::from_raw_fd(master_fd) }; - let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) }; - let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) }; - - loop { - let mut fd_set = FdSet::new(); - fd_set.insert(rfd); - fd_set.insert(master_fd); - fd_set.insert(socket_fd); - - match select( - Some(fd_set.highest().unwrap() + 1), - &mut fd_set, - None, - None, - None, - ) { - Ok(_) => (), - Err(e) => { - if e == nix::Error::from(nix::errno::Errno::EINTR) { - continue; - } else { - error!(debug_shell_logger, "select error {:?}", e); - tx.send(1).unwrap(); - break; - } - } - } - - if fd_set.contains(rfd) { - info!( - debug_shell_logger, - "debug shell process {} exited", child_pid - ); - tx.send(1).unwrap(); - break; - } - - if fd_set.contains(master_fd) { - match io_copy(&mut master_reader, &mut socket_writer) { - Ok(0) => { - debug!(debug_shell_logger, "master fd closed"); - tx.send(1).unwrap(); - break; - } - Ok(_) => {} - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - Err(e) => { - error!(debug_shell_logger, "read master fd error {:?}", e); - tx.send(1).unwrap(); - break; - } - } - } - - if fd_set.contains(socket_fd) { - match io_copy(&mut socket_reader, &mut master_writer) { - Ok(0) => { - debug!(debug_shell_logger, "socket fd closed"); - tx.send(1).unwrap(); - break; - } - Ok(_) => {} - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - Err(e) => { - error!(debug_shell_logger, "read socket fd error {:?}", e); - tx.send(1).unwrap(); - break; - } - } - } - } - }); - - let wait_status = wait::waitpid(child_pid, None); - info!(logger, "debug console process exit code: {:?}", wait_status); - - info!(logger, "notify debug monitor thread to exit"); - // close pipe to exit select loop - let _ = close(wfd); - - // wait for thread exit. - let _ = rx.recv().unwrap(); - info!(logger, "debug monitor thread has exited"); - - // close files - let _ = close(rfd); - let _ = close(master_fd); - let _ = close(slave_fd); - } - Err(err) => { - return Err(anyhow!("fork error: {:?}", err)); - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::tempdir; - - #[test] - fn test_setup_debug_console_no_shells() { - // Guarantee no shells have been added - // (required to avoid racing with - // test_setup_debug_console_invalid_shell()). - let shells_ref = SHELLS.clone(); - let mut shells = shells_ref.lock().unwrap(); - shells.clear(); - let logger = slog_scope::logger(); - - let result = setup_debug_console(&logger, shells.to_vec(), 0); - - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "no shell found to launch debug console" - ); - } - - #[test] - fn test_setup_debug_console_invalid_shell() { - let shells_ref = SHELLS.clone(); - let mut shells = shells_ref.lock().unwrap(); - - let dir = tempdir().expect("failed to create tmpdir"); - - // Add an invalid shell - let shell = dir - .path() - .join("enoent") - .to_str() - .expect("failed to construct shell path") - .to_string(); - - shells.push(shell); - let logger = slog_scope::logger(); - - let result = setup_debug_console(&logger, shells.to_vec(), 0); - - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "no shell found to launch debug console" - ); - } -} From 790332575b1585cbba66ad49158587c5708ac860 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Mon, 29 Mar 2021 22:06:41 +0800 Subject: [PATCH 2/3] agent: async the debug console Make the debug console in this commit. Finish the rework of debug console. Fixes: #1647 Signed-off-by: Tim Zhang --- src/agent/src/console.rs | 257 +++++++++++++++------------------------ src/agent/src/main.rs | 29 +---- src/agent/src/util.rs | 13 ++ 3 files changed, 110 insertions(+), 189 deletions(-) diff --git a/src/agent/src/console.rs b/src/agent/src/console.rs index a535836d5e..ac067109b2 100644 --- a/src/agent/src/console.rs +++ b/src/agent/src/console.rs @@ -4,10 +4,9 @@ // SPDX-License-Identifier: Apache-2.0 // +use crate::util; use anyhow::{anyhow, Result}; -use lazy_static; -use nix::fcntl::{self, OFlag}; -use nix::fcntl::{FcntlArg, FdFlag}; +use nix::fcntl::{self, FcntlArg, FdFlag, OFlag}; use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; use nix::pty::{openpty, OpenptyResult}; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; @@ -16,18 +15,19 @@ use nix::sys::wait; use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid}; use rustjail::pipestream::PipeStream; use slog::Logger; -use std::ffi::{CStr, CString}; -use std::fs::File; -use std::io::{Read, Write}; +use std::ffi::CString; use std::os::unix::io::{FromRawFd, RawFd}; use std::path::PathBuf; +use std::process::Stdio; use std::sync::Arc; use std::sync::Mutex as SyncMutex; + +use futures::StreamExt; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::select; use tokio::sync::watch::Receiver; -use tokio::{pin, select}; const CONSOLE_PATH: &str = "/dev/console"; -const DEFAULT_BUF_SIZE: usize = 8 * 1024; lazy_static! { static ref SHELLS: Arc>> = { @@ -53,16 +53,13 @@ pub async fn debug_console_handler( ) -> Result<()> { let logger = logger.new(o!("subsystem" => "debug-console")); - let shells = SHELLS.clone(); - let shells = shells.lock().unwrap().to_vec(); + let shells = SHELLS.lock().unwrap().to_vec(); let shell = shells - .iter() + .into_iter() .find(|sh| PathBuf::from(sh).exists()) .ok_or_else(|| anyhow!("no shell found to launch debug console"))?; - let fd: RawFd; - if port > 0 { let listenfd = socket::socket( AddressFamily::Vsock, @@ -74,40 +71,66 @@ pub async fn debug_console_handler( socket::bind(listenfd, &addr)?; socket::listen(listenfd, 1)?; - fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?; + let mut incoming = util::get_vsock_incoming(listenfd); + + loop { + select! { + _ = shutdown.changed() => { + info!(logger, "debug console got shutdown request"); + break; + } + + conn = incoming.next() => { + if let Some(conn) = conn { + // Accept a new connection + match conn { + Ok(stream) => { + let logger = logger.clone(); + let shell = shell.clone(); + // Do not block(await) here, or we'll never receive the shutdown signal + tokio::spawn(async move { + let _ = run_debug_console_vsock(logger, shell, stream).await; + }); + } + Err(e) => { + error!(logger, "{:?}", e); + } + } + } else { + break; + } + } + } + } } else { let mut flags = OFlag::empty(); flags.insert(OFlag::O_RDWR); flags.insert(OFlag::O_CLOEXEC); - fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; - }; + let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; - loop { select! { _ = shutdown.changed() => { - info!(logger, "got shutdown request"); - break; + info!(logger, "debug console got shutdown request"); } - // BUG: FIXME: wait on parent. - //result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => { - // match result { - // Ok(_) => { - // info!(logger, "run_debug_console_shell session finished"); - // } - // Err(err) => { - // error!(logger, "run_debug_console_shell failed: {:?}", err); - // } - // } - //} + result = run_debug_console_serial(shell.clone(), fd) => { + match result { + Ok(_) => { + info!(logger, "run_debug_console_shell session finished"); + } + Err(err) => { + error!(logger, "run_debug_console_shell failed: {:?}", err); + } + } + } } - } + }; Ok(()) } -fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { +fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> { // create new session with child as session leader setsid()?; @@ -122,10 +145,9 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { } let cmd = CString::new(shell).unwrap(); - let args: Vec<&CStr> = vec![]; // run shell - let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e { + let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e { nix::Error::Sys(errno) => { std::process::exit(errno as i32); } @@ -135,122 +157,45 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { Ok(()) } -async fn run_in_parent( +async fn run_in_parent( logger: Logger, - mut shutdown: Receiver, - socket_fd: RawFd, + stream: T, pseudo: OpenptyResult, child_pid: Pid, ) -> Result<()> { info!(logger, "get debug shell pid {:?}", child_pid); - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; let master_fd = pseudo.master; - let slave_fd = pseudo.slave; - //let debug_shell_logger = logger.clone(); + let _ = close(pseudo.slave); - let logger = logger.clone(); - - // channel that used to sync between thread and main process - let (tx, rx) = std::sync::mpsc::channel::(); - - // start a thread to do IO copy between socket and pseudo.master - //tokio::spawn(async move { - //let logger = logger.clone(); - //let mut shutdown = shutdown.clone(); - - //let mut master_reader = unsafe { File::from_raw_fd(master_fd) }; - //let mut master_writer = unsafe { File::from_raw_fd(master_fd) }; - //let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) }; - //let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) }; - - let mut pipe_reader = PipeStream::from_fd(rfd); - - //let mut pty_master_reader = PipeStream::from_fd(master_fd); - //let mut socket_reader = PipeStream::from_fd(socket_fd); - - //pin!(pipe_reader); - - // BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd - // (see commented out code below). - loop { - select! { - _ = shutdown.changed() => { - info!(logger, "got shutdown request"); - break; - }, - _ = pipe_reader => { - info!( - debug_shell_logger, - "debug shell process {} exited", child_pid - ); - tx.send(1).unwrap(); - }, + let (mut socket_reader, mut socket_writer) = tokio::io::split(stream); + let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd)); + select! { + res = tokio::io::copy(&mut master_reader, &mut socket_writer) => { + debug!( + logger, + "master closed: {:?}", res + ); + } + res = tokio::io::copy(&mut socket_reader, &mut master_writer) => { + info!( + logger, + "socket closed: {:?}", res + ); } } - // if fd_set.contains(master_fd) { - // match io_copy(&mut master_reader, &mut socket_writer) { - // Ok(0) => { - // debug!(debug_shell_logger, "master fd closed"); - // tx.send(1).unwrap(); - // break; - // } - // Ok(_) => {} - // Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - // Err(e) => { - // error!(debug_shell_logger, "read master fd error {:?}", e); - // tx.send(1).unwrap(); - // break; - // } - // } - // } - - // if fd_set.contains(socket_fd) { - // match io_copy(&mut socket_reader, &mut master_writer) { - // Ok(0) => { - // debug!(debug_shell_logger, "socket fd closed"); - // tx.send(1).unwrap(); - // break; - // } - // Ok(_) => {} - // Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - // Err(e) => { - // error!(debug_shell_logger, "read socket fd error {:?}", e); - // tx.send(1).unwrap(); - // break; - // } - // } - // } - //} - //}) - //.await; - let wait_status = wait::waitpid(child_pid, None); info!(logger, "debug console process exit code: {:?}", wait_status); - info!(logger, "notify debug monitor thread to exit"); - // close pipe to exit select loop - let _ = close(wfd); - - // wait for thread exit. - let _ = rx.recv().unwrap(); - info!(logger, "debug monitor thread has exited"); - - // close files - let _ = close(rfd); - let _ = close(master_fd); - let _ = close(slave_fd); - Ok(()) } -async fn run_debug_console_shell( +async fn run_debug_console_vsock( logger: Logger, - shell: &str, - socket_fd: RawFd, - shutdown: Receiver, + shell: String, + stream: T, ) -> Result<()> { let logger = logger.new(o!("subsystem" => "debug-console-shell")); @@ -263,19 +208,30 @@ async fn run_debug_console_shell( match fork() { Ok(ForkResult::Child) => run_in_child(slave_fd, shell), Ok(ForkResult::Parent { child: child_pid }) => { - run_in_parent( - logger.clone(), - shutdown.clone(), - socket_fd, - pseudo, - child_pid, - ) - .await + run_in_parent(logger.clone(), stream, pseudo, child_pid).await } Err(err) => Err(anyhow!("fork error: {:?}", err)), } } +async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> { + let mut child = match tokio::process::Command::new(shell) + .arg("-i") + .kill_on_drop(true) + .stdin(unsafe { Stdio::from_raw_fd(fd) }) + .stdout(unsafe { Stdio::from_raw_fd(fd) }) + .stderr(unsafe { Stdio::from_raw_fd(fd) }) + .spawn() + { + Ok(c) => c, + Err(_) => return Err(anyhow!("failed to spawn shell")), + }; + + child.wait().await?; + + Ok(()) +} + // BUG: FIXME: #[cfg(test)] mod tests { @@ -328,26 +284,3 @@ mod tests { ); } } - -// BUG: FIXME: should not be required as we can use the -// interruptable_io_copier(). But if it is still needed, move to utils.rs. -fn io_copy(reader: &mut R, writer: &mut W) -> std::io::Result -where - R: Read, - W: Write, -{ - let mut buf = [0; DEFAULT_BUF_SIZE]; - let buf_len; - - match reader.read(&mut buf) { - Ok(0) => return Ok(0), - Ok(len) => buf_len = len, - Err(err) => return Err(err), - }; - - // write and return - match writer.write_all(&buf[..buf_len]) { - Ok(_) => Ok(buf_len as u64), - Err(err) => Err(err), - } -} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index fd6dcfe846..595951bc5e 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -20,11 +20,9 @@ extern crate scopeguard; extern crate slog; use anyhow::{anyhow, Context, Result}; -use nix::fcntl::{self, OFlag}; -use nix::fcntl::{FcntlArg, FdFlag}; +use nix::fcntl::OFlag; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; use nix::unistd::{self, dup, Pid}; -use std::collections::HashMap; use std::env; use std::ffi::OsStr; use std::fs::{self, File}; @@ -61,7 +59,6 @@ use slog::Logger; use uevent::watch_uevents; use futures::future::join_all; -use futures::StreamExt as _; use rustjail::pipestream::PipeStream; use tokio::{ io::AsyncWrite, @@ -71,7 +68,6 @@ use tokio::{ }, task::JoinHandle, }; -use tokio_vsock::{Incoming, VsockListener, VsockStream}; mod rpc; @@ -96,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) { ); } -fn set_fd_close_exec(fd: RawFd) -> Result { - if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) { - return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e)); - } - Ok(fd) -} - -fn get_vsock_incoming(fd: RawFd) -> Incoming { - let incoming; - unsafe { - incoming = VsockListener::from_raw_fd(fd).incoming(); - } - incoming -} - -async fn get_vsock_stream(fd: RawFd) -> Result { - let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap(); - set_fd_close_exec(stream.as_raw_fd())?; - Ok(stream) -} - // Create a thread to handle reading from the logger pipe. The thread will // output to the vsock port specified, or stdout. async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver) -> Result<()> { @@ -135,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver Incoming { + unsafe { VsockListener::from_raw_fd(fd).incoming() } +} + +pub async fn get_vsock_stream(fd: RawFd) -> Result { + let stream = get_vsock_incoming(fd).next().await.unwrap()?; + Ok(stream) +} + #[cfg(test)] mod tests { use super::*; From 24b0703fdafb90589dfde59d2fe858c55dabcfdf Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Wed, 7 Apr 2021 21:03:10 +0800 Subject: [PATCH 3/3] agent: fix test for the debug console Fix test for the debug console. Signed-off-by: Tim Zhang --- src/agent/src/console.rs | 58 +++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/agent/src/console.rs b/src/agent/src/console.rs index ac067109b2..97aa95d4e2 100644 --- a/src/agent/src/console.rs +++ b/src/agent/src/console.rs @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Ant Financial +// Copyright (c) 2021 Ant Group // Copyright (c) 2021 Intel Corporation // // SPDX-License-Identifier: Apache-2.0 @@ -232,23 +232,27 @@ async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> { Ok(()) } -// BUG: FIXME: #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; + use tokio::sync::watch; + + #[tokio::test] + async fn test_setup_debug_console_no_shells() { + { + // Guarantee no shells have been added + // (required to avoid racing with + // test_setup_debug_console_invalid_shell()). + let shells_ref = SHELLS.clone(); + let mut shells = shells_ref.lock().unwrap(); + shells.clear(); + } - #[test] - fn test_setup_debug_console_no_shells() { - // Guarantee no shells have been added - // (required to avoid racing with - // test_setup_debug_console_invalid_shell()). - let shells_ref = SHELLS.clone(); - let mut shells = shells_ref.lock().unwrap(); - shells.clear(); let logger = slog_scope::logger(); - let result = setup_debug_console(&logger, shells.to_vec(), 0); + let (_, rx) = watch::channel(true); + let result = debug_console_handler(logger, 0, rx).await; assert!(result.is_err()); assert_eq!( @@ -257,25 +261,29 @@ mod tests { ); } - #[test] - fn test_setup_debug_console_invalid_shell() { - let shells_ref = SHELLS.clone(); - let mut shells = shells_ref.lock().unwrap(); + #[tokio::test] + async fn test_setup_debug_console_invalid_shell() { + { + let shells_ref = SHELLS.clone(); + let mut shells = shells_ref.lock().unwrap(); - let dir = tempdir().expect("failed to create tmpdir"); + let dir = tempdir().expect("failed to create tmpdir"); - // Add an invalid shell - let shell = dir - .path() - .join("enoent") - .to_str() - .expect("failed to construct shell path") - .to_string(); + // Add an invalid shell + let shell = dir + .path() + .join("enoent") + .to_str() + .expect("failed to construct shell path") + .to_string(); + + shells.push(shell); + } - shells.push(shell); let logger = slog_scope::logger(); - let result = setup_debug_console(&logger, shells.to_vec(), 0); + let (_, rx) = watch::channel(true); + let result = debug_console_handler(logger, 0, rx).await; assert!(result.is_err()); assert_eq!(