diff --git a/src/agent/src/console.rs b/src/agent/src/console.rs index a535836d5e..ac067109b2 100644 --- a/src/agent/src/console.rs +++ b/src/agent/src/console.rs @@ -4,10 +4,9 @@ // SPDX-License-Identifier: Apache-2.0 // +use crate::util; use anyhow::{anyhow, Result}; -use lazy_static; -use nix::fcntl::{self, OFlag}; -use nix::fcntl::{FcntlArg, FdFlag}; +use nix::fcntl::{self, FcntlArg, FdFlag, OFlag}; use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}; use nix::pty::{openpty, OpenptyResult}; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; @@ -16,18 +15,19 @@ use nix::sys::wait; use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid}; use rustjail::pipestream::PipeStream; use slog::Logger; -use std::ffi::{CStr, CString}; -use std::fs::File; -use std::io::{Read, Write}; +use std::ffi::CString; use std::os::unix::io::{FromRawFd, RawFd}; use std::path::PathBuf; +use std::process::Stdio; use std::sync::Arc; use std::sync::Mutex as SyncMutex; + +use futures::StreamExt; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::select; use tokio::sync::watch::Receiver; -use tokio::{pin, select}; const CONSOLE_PATH: &str = "/dev/console"; -const DEFAULT_BUF_SIZE: usize = 8 * 1024; lazy_static! { static ref SHELLS: Arc>> = { @@ -53,16 +53,13 @@ pub async fn debug_console_handler( ) -> Result<()> { let logger = logger.new(o!("subsystem" => "debug-console")); - let shells = SHELLS.clone(); - let shells = shells.lock().unwrap().to_vec(); + let shells = SHELLS.lock().unwrap().to_vec(); let shell = shells - .iter() + .into_iter() .find(|sh| PathBuf::from(sh).exists()) .ok_or_else(|| anyhow!("no shell found to launch debug console"))?; - let fd: RawFd; - if port > 0 { let listenfd = socket::socket( AddressFamily::Vsock, @@ -74,40 +71,66 @@ pub async fn debug_console_handler( socket::bind(listenfd, &addr)?; socket::listen(listenfd, 1)?; - fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?; + let mut incoming = util::get_vsock_incoming(listenfd); + + loop { + select! { + _ = shutdown.changed() => { + info!(logger, "debug console got shutdown request"); + break; + } + + conn = incoming.next() => { + if let Some(conn) = conn { + // Accept a new connection + match conn { + Ok(stream) => { + let logger = logger.clone(); + let shell = shell.clone(); + // Do not block(await) here, or we'll never receive the shutdown signal + tokio::spawn(async move { + let _ = run_debug_console_vsock(logger, shell, stream).await; + }); + } + Err(e) => { + error!(logger, "{:?}", e); + } + } + } else { + break; + } + } + } + } } else { let mut flags = OFlag::empty(); flags.insert(OFlag::O_RDWR); flags.insert(OFlag::O_CLOEXEC); - fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; - }; + let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?; - loop { select! { _ = shutdown.changed() => { - info!(logger, "got shutdown request"); - break; + info!(logger, "debug console got shutdown request"); } - // BUG: FIXME: wait on parent. - //result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => { - // match result { - // Ok(_) => { - // info!(logger, "run_debug_console_shell session finished"); - // } - // Err(err) => { - // error!(logger, "run_debug_console_shell failed: {:?}", err); - // } - // } - //} + result = run_debug_console_serial(shell.clone(), fd) => { + match result { + Ok(_) => { + info!(logger, "run_debug_console_shell session finished"); + } + Err(err) => { + error!(logger, "run_debug_console_shell failed: {:?}", err); + } + } + } } - } + }; Ok(()) } -fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { +fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> { // create new session with child as session leader setsid()?; @@ -122,10 +145,9 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { } let cmd = CString::new(shell).unwrap(); - let args: Vec<&CStr> = vec![]; // run shell - let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e { + let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e { nix::Error::Sys(errno) => { std::process::exit(errno as i32); } @@ -135,122 +157,45 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> { Ok(()) } -async fn run_in_parent( +async fn run_in_parent( logger: Logger, - mut shutdown: Receiver, - socket_fd: RawFd, + stream: T, pseudo: OpenptyResult, child_pid: Pid, ) -> Result<()> { info!(logger, "get debug shell pid {:?}", child_pid); - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; let master_fd = pseudo.master; - let slave_fd = pseudo.slave; - //let debug_shell_logger = logger.clone(); + let _ = close(pseudo.slave); - let logger = logger.clone(); - - // channel that used to sync between thread and main process - let (tx, rx) = std::sync::mpsc::channel::(); - - // start a thread to do IO copy between socket and pseudo.master - //tokio::spawn(async move { - //let logger = logger.clone(); - //let mut shutdown = shutdown.clone(); - - //let mut master_reader = unsafe { File::from_raw_fd(master_fd) }; - //let mut master_writer = unsafe { File::from_raw_fd(master_fd) }; - //let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) }; - //let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) }; - - let mut pipe_reader = PipeStream::from_fd(rfd); - - //let mut pty_master_reader = PipeStream::from_fd(master_fd); - //let mut socket_reader = PipeStream::from_fd(socket_fd); - - //pin!(pipe_reader); - - // BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd - // (see commented out code below). - loop { - select! { - _ = shutdown.changed() => { - info!(logger, "got shutdown request"); - break; - }, - _ = pipe_reader => { - info!( - debug_shell_logger, - "debug shell process {} exited", child_pid - ); - tx.send(1).unwrap(); - }, + let (mut socket_reader, mut socket_writer) = tokio::io::split(stream); + let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd)); + select! { + res = tokio::io::copy(&mut master_reader, &mut socket_writer) => { + debug!( + logger, + "master closed: {:?}", res + ); + } + res = tokio::io::copy(&mut socket_reader, &mut master_writer) => { + info!( + logger, + "socket closed: {:?}", res + ); } } - // if fd_set.contains(master_fd) { - // match io_copy(&mut master_reader, &mut socket_writer) { - // Ok(0) => { - // debug!(debug_shell_logger, "master fd closed"); - // tx.send(1).unwrap(); - // break; - // } - // Ok(_) => {} - // Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - // Err(e) => { - // error!(debug_shell_logger, "read master fd error {:?}", e); - // tx.send(1).unwrap(); - // break; - // } - // } - // } - - // if fd_set.contains(socket_fd) { - // match io_copy(&mut socket_reader, &mut master_writer) { - // Ok(0) => { - // debug!(debug_shell_logger, "socket fd closed"); - // tx.send(1).unwrap(); - // break; - // } - // Ok(_) => {} - // Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - // Err(e) => { - // error!(debug_shell_logger, "read socket fd error {:?}", e); - // tx.send(1).unwrap(); - // break; - // } - // } - // } - //} - //}) - //.await; - let wait_status = wait::waitpid(child_pid, None); info!(logger, "debug console process exit code: {:?}", wait_status); - info!(logger, "notify debug monitor thread to exit"); - // close pipe to exit select loop - let _ = close(wfd); - - // wait for thread exit. - let _ = rx.recv().unwrap(); - info!(logger, "debug monitor thread has exited"); - - // close files - let _ = close(rfd); - let _ = close(master_fd); - let _ = close(slave_fd); - Ok(()) } -async fn run_debug_console_shell( +async fn run_debug_console_vsock( logger: Logger, - shell: &str, - socket_fd: RawFd, - shutdown: Receiver, + shell: String, + stream: T, ) -> Result<()> { let logger = logger.new(o!("subsystem" => "debug-console-shell")); @@ -263,19 +208,30 @@ async fn run_debug_console_shell( match fork() { Ok(ForkResult::Child) => run_in_child(slave_fd, shell), Ok(ForkResult::Parent { child: child_pid }) => { - run_in_parent( - logger.clone(), - shutdown.clone(), - socket_fd, - pseudo, - child_pid, - ) - .await + run_in_parent(logger.clone(), stream, pseudo, child_pid).await } Err(err) => Err(anyhow!("fork error: {:?}", err)), } } +async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> { + let mut child = match tokio::process::Command::new(shell) + .arg("-i") + .kill_on_drop(true) + .stdin(unsafe { Stdio::from_raw_fd(fd) }) + .stdout(unsafe { Stdio::from_raw_fd(fd) }) + .stderr(unsafe { Stdio::from_raw_fd(fd) }) + .spawn() + { + Ok(c) => c, + Err(_) => return Err(anyhow!("failed to spawn shell")), + }; + + child.wait().await?; + + Ok(()) +} + // BUG: FIXME: #[cfg(test)] mod tests { @@ -328,26 +284,3 @@ mod tests { ); } } - -// BUG: FIXME: should not be required as we can use the -// interruptable_io_copier(). But if it is still needed, move to utils.rs. -fn io_copy(reader: &mut R, writer: &mut W) -> std::io::Result -where - R: Read, - W: Write, -{ - let mut buf = [0; DEFAULT_BUF_SIZE]; - let buf_len; - - match reader.read(&mut buf) { - Ok(0) => return Ok(0), - Ok(len) => buf_len = len, - Err(err) => return Err(err), - }; - - // write and return - match writer.write_all(&buf[..buf_len]) { - Ok(_) => Ok(buf_len as u64), - Err(err) => Err(err), - } -} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index fd6dcfe846..595951bc5e 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -20,11 +20,9 @@ extern crate scopeguard; extern crate slog; use anyhow::{anyhow, Context, Result}; -use nix::fcntl::{self, OFlag}; -use nix::fcntl::{FcntlArg, FdFlag}; +use nix::fcntl::OFlag; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; use nix::unistd::{self, dup, Pid}; -use std::collections::HashMap; use std::env; use std::ffi::OsStr; use std::fs::{self, File}; @@ -61,7 +59,6 @@ use slog::Logger; use uevent::watch_uevents; use futures::future::join_all; -use futures::StreamExt as _; use rustjail::pipestream::PipeStream; use tokio::{ io::AsyncWrite, @@ -71,7 +68,6 @@ use tokio::{ }, task::JoinHandle, }; -use tokio_vsock::{Incoming, VsockListener, VsockStream}; mod rpc; @@ -96,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) { ); } -fn set_fd_close_exec(fd: RawFd) -> Result { - if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) { - return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e)); - } - Ok(fd) -} - -fn get_vsock_incoming(fd: RawFd) -> Incoming { - let incoming; - unsafe { - incoming = VsockListener::from_raw_fd(fd).incoming(); - } - incoming -} - -async fn get_vsock_stream(fd: RawFd) -> Result { - let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap(); - set_fd_close_exec(stream.as_raw_fd())?; - Ok(stream) -} - // Create a thread to handle reading from the logger pipe. The thread will // output to the vsock port specified, or stdout. async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver) -> Result<()> { @@ -135,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver Incoming { + unsafe { VsockListener::from_raw_fd(fd).incoming() } +} + +pub async fn get_vsock_stream(fd: RawFd) -> Result { + let stream = get_vsock_incoming(fd).next().await.unwrap()?; + Ok(stream) +} + #[cfg(test)] mod tests { use super::*;