agent: async the debug console

Make the debug console in this commit.
Finish the rework of debug console.

Fixes: #1647

Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
Tim Zhang 2021-03-29 22:06:41 +08:00
parent 9017e1100b
commit 790332575b
3 changed files with 110 additions and 189 deletions

View File

@ -4,10 +4,9 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::util;
use anyhow::{anyhow, Result};
use lazy_static;
use nix::fcntl::{self, OFlag};
use nix::fcntl::{FcntlArg, FdFlag};
use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use nix::pty::{openpty, OpenptyResult};
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
@ -16,18 +15,19 @@ use nix::sys::wait;
use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid};
use rustjail::pipestream::PipeStream;
use slog::Logger;
use std::ffi::{CStr, CString};
use std::fs::File;
use std::io::{Read, Write};
use std::ffi::CString;
use std::os::unix::io::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex as SyncMutex;
use futures::StreamExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::select;
use tokio::sync::watch::Receiver;
use tokio::{pin, select};
const CONSOLE_PATH: &str = "/dev/console";
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
lazy_static! {
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
@ -53,16 +53,13 @@ pub async fn debug_console_handler(
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "debug-console"));
let shells = SHELLS.clone();
let shells = shells.lock().unwrap().to_vec();
let shells = SHELLS.lock().unwrap().to_vec();
let shell = shells
.iter()
.into_iter()
.find(|sh| PathBuf::from(sh).exists())
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
let fd: RawFd;
if port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
@ -74,40 +71,66 @@ pub async fn debug_console_handler(
socket::bind(listenfd, &addr)?;
socket::listen(listenfd, 1)?;
fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?;
let mut incoming = util::get_vsock_incoming(listenfd);
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "debug console got shutdown request");
break;
}
conn = incoming.next() => {
if let Some(conn) = conn {
// Accept a new connection
match conn {
Ok(stream) => {
let logger = logger.clone();
let shell = shell.clone();
// Do not block(await) here, or we'll never receive the shutdown signal
tokio::spawn(async move {
let _ = run_debug_console_vsock(logger, shell, stream).await;
});
}
Err(e) => {
error!(logger, "{:?}", e);
}
}
} else {
break;
}
}
}
}
} else {
let mut flags = OFlag::empty();
flags.insert(OFlag::O_RDWR);
flags.insert(OFlag::O_CLOEXEC);
fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
};
let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
info!(logger, "debug console got shutdown request");
}
// BUG: FIXME: wait on parent.
//result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => {
// match result {
// Ok(_) => {
// info!(logger, "run_debug_console_shell session finished");
// }
// Err(err) => {
// error!(logger, "run_debug_console_shell failed: {:?}", err);
// }
// }
//}
result = run_debug_console_serial(shell.clone(), fd) => {
match result {
Ok(_) => {
info!(logger, "run_debug_console_shell session finished");
}
Err(err) => {
error!(logger, "run_debug_console_shell failed: {:?}", err);
}
}
}
}
}
};
Ok(())
}
fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> {
// create new session with child as session leader
setsid()?;
@ -122,10 +145,9 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
}
let cmd = CString::new(shell).unwrap();
let args: Vec<&CStr> = vec![];
// run shell
let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e {
let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e {
nix::Error::Sys(errno) => {
std::process::exit(errno as i32);
}
@ -135,122 +157,45 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
Ok(())
}
async fn run_in_parent(
async fn run_in_parent<T: AsyncRead + AsyncWrite>(
logger: Logger,
mut shutdown: Receiver<bool>,
socket_fd: RawFd,
stream: T,
pseudo: OpenptyResult,
child_pid: Pid,
) -> Result<()> {
info!(logger, "get debug shell pid {:?}", child_pid);
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let master_fd = pseudo.master;
let slave_fd = pseudo.slave;
//let debug_shell_logger = logger.clone();
let _ = close(pseudo.slave);
let logger = logger.clone();
// channel that used to sync between thread and main process
let (tx, rx) = std::sync::mpsc::channel::<i32>();
// start a thread to do IO copy between socket and pseudo.master
//tokio::spawn(async move {
//let logger = logger.clone();
//let mut shutdown = shutdown.clone();
//let mut master_reader = unsafe { File::from_raw_fd(master_fd) };
//let mut master_writer = unsafe { File::from_raw_fd(master_fd) };
//let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) };
//let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) };
let mut pipe_reader = PipeStream::from_fd(rfd);
//let mut pty_master_reader = PipeStream::from_fd(master_fd);
//let mut socket_reader = PipeStream::from_fd(socket_fd);
//pin!(pipe_reader);
// BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd
// (see commented out code below).
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
},
_ = pipe_reader => {
info!(
debug_shell_logger,
"debug shell process {} exited", child_pid
);
tx.send(1).unwrap();
},
let (mut socket_reader, mut socket_writer) = tokio::io::split(stream);
let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd));
select! {
res = tokio::io::copy(&mut master_reader, &mut socket_writer) => {
debug!(
logger,
"master closed: {:?}", res
);
}
res = tokio::io::copy(&mut socket_reader, &mut master_writer) => {
info!(
logger,
"socket closed: {:?}", res
);
}
}
// if fd_set.contains(master_fd) {
// match io_copy(&mut master_reader, &mut socket_writer) {
// Ok(0) => {
// debug!(debug_shell_logger, "master fd closed");
// tx.send(1).unwrap();
// break;
// }
// Ok(_) => {}
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
// Err(e) => {
// error!(debug_shell_logger, "read master fd error {:?}", e);
// tx.send(1).unwrap();
// break;
// }
// }
// }
// if fd_set.contains(socket_fd) {
// match io_copy(&mut socket_reader, &mut master_writer) {
// Ok(0) => {
// debug!(debug_shell_logger, "socket fd closed");
// tx.send(1).unwrap();
// break;
// }
// Ok(_) => {}
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
// Err(e) => {
// error!(debug_shell_logger, "read socket fd error {:?}", e);
// tx.send(1).unwrap();
// break;
// }
// }
// }
//}
//})
//.await;
let wait_status = wait::waitpid(child_pid, None);
info!(logger, "debug console process exit code: {:?}", wait_status);
info!(logger, "notify debug monitor thread to exit");
// close pipe to exit select loop
let _ = close(wfd);
// wait for thread exit.
let _ = rx.recv().unwrap();
info!(logger, "debug monitor thread has exited");
// close files
let _ = close(rfd);
let _ = close(master_fd);
let _ = close(slave_fd);
Ok(())
}
async fn run_debug_console_shell(
async fn run_debug_console_vsock<T: AsyncRead + AsyncWrite>(
logger: Logger,
shell: &str,
socket_fd: RawFd,
shutdown: Receiver<bool>,
shell: String,
stream: T,
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "debug-console-shell"));
@ -263,19 +208,30 @@ async fn run_debug_console_shell(
match fork() {
Ok(ForkResult::Child) => run_in_child(slave_fd, shell),
Ok(ForkResult::Parent { child: child_pid }) => {
run_in_parent(
logger.clone(),
shutdown.clone(),
socket_fd,
pseudo,
child_pid,
)
.await
run_in_parent(logger.clone(), stream, pseudo, child_pid).await
}
Err(err) => Err(anyhow!("fork error: {:?}", err)),
}
}
async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> {
let mut child = match tokio::process::Command::new(shell)
.arg("-i")
.kill_on_drop(true)
.stdin(unsafe { Stdio::from_raw_fd(fd) })
.stdout(unsafe { Stdio::from_raw_fd(fd) })
.stderr(unsafe { Stdio::from_raw_fd(fd) })
.spawn()
{
Ok(c) => c,
Err(_) => return Err(anyhow!("failed to spawn shell")),
};
child.wait().await?;
Ok(())
}
// BUG: FIXME:
#[cfg(test)]
mod tests {
@ -328,26 +284,3 @@ mod tests {
);
}
}
// BUG: FIXME: should not be required as we can use the
// interruptable_io_copier(). But if it is still needed, move to utils.rs.
fn io_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
where
R: Read,
W: Write,
{
let mut buf = [0; DEFAULT_BUF_SIZE];
let buf_len;
match reader.read(&mut buf) {
Ok(0) => return Ok(0),
Ok(len) => buf_len = len,
Err(err) => return Err(err),
};
// write and return
match writer.write_all(&buf[..buf_len]) {
Ok(_) => Ok(buf_len as u64),
Err(err) => Err(err),
}
}

View File

@ -20,11 +20,9 @@ extern crate scopeguard;
extern crate slog;
use anyhow::{anyhow, Context, Result};
use nix::fcntl::{self, OFlag};
use nix::fcntl::{FcntlArg, FdFlag};
use nix::fcntl::OFlag;
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::unistd::{self, dup, Pid};
use std::collections::HashMap;
use std::env;
use std::ffi::OsStr;
use std::fs::{self, File};
@ -61,7 +59,6 @@ use slog::Logger;
use uevent::watch_uevents;
use futures::future::join_all;
use futures::StreamExt as _;
use rustjail::pipestream::PipeStream;
use tokio::{
io::AsyncWrite,
@ -71,7 +68,6 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_vsock::{Incoming, VsockListener, VsockStream};
mod rpc;
@ -96,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) {
);
}
fn set_fd_close_exec(fd: RawFd) -> Result<RawFd> {
if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) {
return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e));
}
Ok(fd)
}
fn get_vsock_incoming(fd: RawFd) -> Incoming {
let incoming;
unsafe {
incoming = VsockListener::from_raw_fd(fd).incoming();
}
incoming
}
async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap();
set_fd_close_exec(stream.as_raw_fd())?;
Ok(stream)
}
// Create a thread to handle reading from the logger pipe. The thread will
// output to the vsock port specified, or stdout.
async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool>) -> Result<()> {
@ -135,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
writer = Box::new(get_vsock_stream(listenfd).await.unwrap());
writer = Box::new(util::get_vsock_stream(listenfd).await.unwrap());
} else {
writer = Box::new(tokio::io::stdout());
}

View File

@ -3,10 +3,14 @@
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use futures::StreamExt;
use std::io;
use std::io::ErrorKind;
use std::os::unix::io::{FromRawFd, RawFd};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch::Receiver;
use tokio_vsock::{Incoming, VsockListener, VsockStream};
// Size of I/O read buffer
const BUF_SIZE: usize = 8192;
@ -52,6 +56,15 @@ where
Ok(total_bytes)
}
pub fn get_vsock_incoming(fd: RawFd) -> Incoming {
unsafe { VsockListener::from_raw_fd(fd).incoming() }
}
pub async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
let stream = get_vsock_incoming(fd).next().await.unwrap()?;
Ok(stream)
}
#[cfg(test)]
mod tests {
use super::*;