agent: start to rework the debug console

It's the first commit of the rework.

Fixes: #1647

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2021-03-26 17:35:06 +00:00 committed by Tim Zhang
parent 15c2d7ed30
commit 9017e1100b
2 changed files with 367 additions and 314 deletions

353
src/agent/src/console.rs Normal file
View File

@ -0,0 +1,353 @@
// Copyright (c) 2019 Ant Financial
// Copyright (c) 2021 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{anyhow, Result};
use lazy_static;
use nix::fcntl::{self, OFlag};
use nix::fcntl::{FcntlArg, FdFlag};
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use nix::pty::{openpty, OpenptyResult};
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::sys::stat::Mode;
use nix::sys::wait;
use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid};
use rustjail::pipestream::PipeStream;
use slog::Logger;
use std::ffi::{CStr, CString};
use std::fs::File;
use std::io::{Read, Write};
use std::os::unix::io::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex as SyncMutex;
use tokio::sync::watch::Receiver;
use tokio::{pin, select};
const CONSOLE_PATH: &str = "/dev/console";
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
lazy_static! {
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
let mut v = Vec::new();
if !cfg!(test) {
v.push("/bin/bash".to_string());
v.push("/bin/sh".to_string());
}
Arc::new(SyncMutex::new(v))
};
}
pub fn initialize() {
lazy_static::initialize(&SHELLS);
}
pub async fn debug_console_handler(
logger: Logger,
port: u32,
mut shutdown: Receiver<bool>,
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "debug-console"));
let shells = SHELLS.clone();
let shells = shells.lock().unwrap().to_vec();
let shell = shells
.iter()
.find(|sh| PathBuf::from(sh).exists())
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
let fd: RawFd;
if port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)?;
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port);
socket::bind(listenfd, &addr)?;
socket::listen(listenfd, 1)?;
fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?;
} else {
let mut flags = OFlag::empty();
flags.insert(OFlag::O_RDWR);
flags.insert(OFlag::O_CLOEXEC);
fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
};
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
}
// BUG: FIXME: wait on parent.
//result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => {
// match result {
// Ok(_) => {
// info!(logger, "run_debug_console_shell session finished");
// }
// Err(err) => {
// error!(logger, "run_debug_console_shell failed: {:?}", err);
// }
// }
//}
}
}
Ok(())
}
fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
// create new session with child as session leader
setsid()?;
// dup stdin, stdout, stderr to let child act as a terminal
dup2(slave_fd, STDIN_FILENO)?;
dup2(slave_fd, STDOUT_FILENO)?;
dup2(slave_fd, STDERR_FILENO)?;
// set tty
unsafe {
libc::ioctl(0, libc::TIOCSCTTY);
}
let cmd = CString::new(shell).unwrap();
let args: Vec<&CStr> = vec![];
// run shell
let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e {
nix::Error::Sys(errno) => {
std::process::exit(errno as i32);
}
_ => std::process::exit(-2),
});
Ok(())
}
async fn run_in_parent(
logger: Logger,
mut shutdown: Receiver<bool>,
socket_fd: RawFd,
pseudo: OpenptyResult,
child_pid: Pid,
) -> Result<()> {
info!(logger, "get debug shell pid {:?}", child_pid);
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let master_fd = pseudo.master;
let slave_fd = pseudo.slave;
//let debug_shell_logger = logger.clone();
let logger = logger.clone();
// channel that used to sync between thread and main process
let (tx, rx) = std::sync::mpsc::channel::<i32>();
// start a thread to do IO copy between socket and pseudo.master
//tokio::spawn(async move {
//let logger = logger.clone();
//let mut shutdown = shutdown.clone();
//let mut master_reader = unsafe { File::from_raw_fd(master_fd) };
//let mut master_writer = unsafe { File::from_raw_fd(master_fd) };
//let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) };
//let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) };
let mut pipe_reader = PipeStream::from_fd(rfd);
//let mut pty_master_reader = PipeStream::from_fd(master_fd);
//let mut socket_reader = PipeStream::from_fd(socket_fd);
//pin!(pipe_reader);
// BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd
// (see commented out code below).
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
},
_ = pipe_reader => {
info!(
debug_shell_logger,
"debug shell process {} exited", child_pid
);
tx.send(1).unwrap();
},
}
}
// if fd_set.contains(master_fd) {
// match io_copy(&mut master_reader, &mut socket_writer) {
// Ok(0) => {
// debug!(debug_shell_logger, "master fd closed");
// tx.send(1).unwrap();
// break;
// }
// Ok(_) => {}
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
// Err(e) => {
// error!(debug_shell_logger, "read master fd error {:?}", e);
// tx.send(1).unwrap();
// break;
// }
// }
// }
// if fd_set.contains(socket_fd) {
// match io_copy(&mut socket_reader, &mut master_writer) {
// Ok(0) => {
// debug!(debug_shell_logger, "socket fd closed");
// tx.send(1).unwrap();
// break;
// }
// Ok(_) => {}
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
// Err(e) => {
// error!(debug_shell_logger, "read socket fd error {:?}", e);
// tx.send(1).unwrap();
// break;
// }
// }
// }
//}
//})
//.await;
let wait_status = wait::waitpid(child_pid, None);
info!(logger, "debug console process exit code: {:?}", wait_status);
info!(logger, "notify debug monitor thread to exit");
// close pipe to exit select loop
let _ = close(wfd);
// wait for thread exit.
let _ = rx.recv().unwrap();
info!(logger, "debug monitor thread has exited");
// close files
let _ = close(rfd);
let _ = close(master_fd);
let _ = close(slave_fd);
Ok(())
}
async fn run_debug_console_shell(
logger: Logger,
shell: &str,
socket_fd: RawFd,
shutdown: Receiver<bool>,
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "debug-console-shell"));
let pseudo = openpty(None, None)?;
let _ = fcntl::fcntl(pseudo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let _ = fcntl::fcntl(pseudo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let slave_fd = pseudo.slave;
match fork() {
Ok(ForkResult::Child) => run_in_child(slave_fd, shell),
Ok(ForkResult::Parent { child: child_pid }) => {
run_in_parent(
logger.clone(),
shutdown.clone(),
socket_fd,
pseudo,
child_pid,
)
.await
}
Err(err) => Err(anyhow!("fork error: {:?}", err)),
}
}
// BUG: FIXME:
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_setup_debug_console_no_shells() {
// Guarantee no shells have been added
// (required to avoid racing with
// test_setup_debug_console_invalid_shell()).
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
shells.clear();
let logger = slog_scope::logger();
let result = setup_debug_console(&logger, shells.to_vec(), 0);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
#[test]
fn test_setup_debug_console_invalid_shell() {
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
let dir = tempdir().expect("failed to create tmpdir");
// Add an invalid shell
let shell = dir
.path()
.join("enoent")
.to_str()
.expect("failed to construct shell path")
.to_string();
shells.push(shell);
let logger = slog_scope::logger();
let result = setup_debug_console(&logger, shells.to_vec(), 0);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
}
// BUG: FIXME: should not be required as we can use the
// interruptable_io_copier(). But if it is still needed, move to utils.rs.
fn io_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
where
R: Read,
W: Write,
{
let mut buf = [0; DEFAULT_BUF_SIZE];
let buf_len;
match reader.read(&mut buf) {
Ok(0) => return Ok(0),
Ok(len) => buf_len = len,
Err(err) => return Err(err),
};
// write and return
match writer.write_all(&buf[..buf_len]) {
Ok(_) => Ok(buf_len as u64),
Err(err) => Err(err),
}
}

View File

@ -22,24 +22,21 @@ extern crate slog;
use anyhow::{anyhow, Context, Result};
use nix::fcntl::{self, OFlag};
use nix::fcntl::{FcntlArg, FdFlag};
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use nix::pty;
use nix::sys::select::{select, FdSet};
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::sys::wait;
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
use nix::unistd::{self, dup, Pid};
use std::collections::HashMap;
use std::env;
use std::ffi::{CStr, CString, OsStr};
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::{Read, Write};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs as unixfs;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::process::exit;
use std::sync::Arc;
use unistd::Pid;
mod config;
mod console;
mod device;
mod linux_abi;
mod metrics;
@ -63,8 +60,6 @@ use signal::setup_signal_handler;
use slog::Logger;
use uevent::watch_uevents;
use std::sync::Mutex as SyncMutex;
use futures::future::join_all;
use futures::StreamExt as _;
use rustjail::pipestream::PipeStream;
@ -82,9 +77,6 @@ mod rpc;
const NAME: &str = "kata-agent";
const KERNEL_CMDLINE_FILE: &str = "/proc/cmdline";
const CONSOLE_PATH: &str = "/dev/console";
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
lazy_static! {
static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> =
@ -159,7 +151,7 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// List of tasks that need to be stopped for a clean shutdown
let mut tasks: Vec<JoinHandle<Result<()>>> = vec![];
lazy_static::initialize(&SHELLS);
console::initialize();
lazy_static::initialize(&AGENT_CONFIG);
@ -299,26 +291,17 @@ async fn start_sandbox(
tasks: &mut Vec<JoinHandle<Result<()>>>,
shutdown: Receiver<bool>,
) -> Result<()> {
let shells = SHELLS.clone();
let debug_console_vport = config.debug_console_vport as u32;
let shell_handle = if config.debug_console {
let thread_logger = logger.clone();
let shells = shells.lock().unwrap().to_vec();
if config.debug_console {
let debug_console_task = tokio::task::spawn(console::debug_console_handler(
logger.clone(),
debug_console_vport,
shutdown.clone(),
));
let handle = tokio::task::spawn_blocking(move || {
let result = setup_debug_console(&thread_logger, shells, debug_console_vport);
if result.is_err() {
// Report error, but don't fail
warn!(thread_logger, "failed to setup debug console";
"error" => format!("{}", result.unwrap_err()));
}
});
Some(handle)
} else {
None
};
tasks.push(debug_console_task);
}
// Initialize unique sandbox structure.
let s = Sandbox::new(&logger).context("Failed to create sandbox")?;
@ -350,10 +333,6 @@ async fn start_sandbox(
let _ = rx.await?;
server.shutdown().await?;
if let Some(handle) = shell_handle {
handle.await.map_err(|e| anyhow!("{:?}", e))?;
}
Ok(())
}
@ -404,284 +383,5 @@ fn sethostname(hostname: &OsStr) -> Result<()> {
}
}
lazy_static! {
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
let mut v = Vec::new();
if !cfg!(test) {
v.push("/bin/bash".to_string());
v.push("/bin/sh".to_string());
}
Arc::new(SyncMutex::new(v))
};
}
use crate::config::AgentConfig;
use nix::sys::stat::Mode;
use std::os::unix::io::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::process::exit;
fn setup_debug_console(logger: &Logger, shells: Vec<String>, port: u32) -> Result<()> {
let shell = shells
.iter()
.find(|sh| PathBuf::from(sh).exists())
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
if port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)?;
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port);
socket::bind(listenfd, &addr)?;
socket::listen(listenfd, 1)?;
loop {
let f: RawFd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?;
match run_debug_console_shell(logger, shell, f) {
Ok(_) => {
info!(logger, "run_debug_console_shell session finished");
}
Err(err) => {
error!(logger, "run_debug_console_shell failed: {:?}", err);
}
}
}
} else {
let mut flags = OFlag::empty();
flags.insert(OFlag::O_RDWR);
flags.insert(OFlag::O_CLOEXEC);
loop {
let f: RawFd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
match run_debug_console_shell(logger, shell, f) {
Ok(_) => {
info!(logger, "run_debug_console_shell session finished");
}
Err(err) => {
error!(logger, "run_debug_console_shell failed: {:?}", err);
}
}
}
};
}
fn io_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
where
R: Read,
W: Write,
{
let mut buf = [0; DEFAULT_BUF_SIZE];
let buf_len;
match reader.read(&mut buf) {
Ok(0) => return Ok(0),
Ok(len) => buf_len = len,
Err(err) => return Err(err),
};
// write and return
match writer.write_all(&buf[..buf_len]) {
Ok(_) => Ok(buf_len as u64),
Err(err) => Err(err),
}
}
fn run_debug_console_shell(logger: &Logger, shell: &str, socket_fd: RawFd) -> Result<()> {
let pseduo = pty::openpty(None, None)?;
let _ = fcntl::fcntl(pseduo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let _ = fcntl::fcntl(pseduo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let slave_fd = pseduo.slave;
match fork() {
Ok(ForkResult::Child) => {
// create new session with child as session leader
setsid()?;
// dup stdin, stdout, stderr to let child act as a terminal
dup2(slave_fd, STDIN_FILENO)?;
dup2(slave_fd, STDOUT_FILENO)?;
dup2(slave_fd, STDERR_FILENO)?;
// set tty
unsafe {
libc::ioctl(0, libc::TIOCSCTTY);
}
let cmd = CString::new(shell).unwrap();
let args: Vec<&CStr> = vec![];
// run shell
let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e {
nix::Error::Sys(errno) => {
std::process::exit(errno as i32);
}
_ => std::process::exit(-2),
});
}
Ok(ForkResult::Parent { child: child_pid }) => {
info!(logger, "get debug shell pid {:?}", child_pid);
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let master_fd = pseduo.master;
let debug_shell_logger = logger.clone();
// channel that used to sync between thread and main process
let (tx, rx) = std::sync::mpsc::channel::<i32>();
// start a thread to do IO copy between socket and pseduo.master
std::thread::spawn(move || {
let mut master_reader = unsafe { File::from_raw_fd(master_fd) };
let mut master_writer = unsafe { File::from_raw_fd(master_fd) };
let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) };
let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) };
loop {
let mut fd_set = FdSet::new();
fd_set.insert(rfd);
fd_set.insert(master_fd);
fd_set.insert(socket_fd);
match select(
Some(fd_set.highest().unwrap() + 1),
&mut fd_set,
None,
None,
None,
) {
Ok(_) => (),
Err(e) => {
if e == nix::Error::from(nix::errno::Errno::EINTR) {
continue;
} else {
error!(debug_shell_logger, "select error {:?}", e);
tx.send(1).unwrap();
break;
}
}
}
if fd_set.contains(rfd) {
info!(
debug_shell_logger,
"debug shell process {} exited", child_pid
);
tx.send(1).unwrap();
break;
}
if fd_set.contains(master_fd) {
match io_copy(&mut master_reader, &mut socket_writer) {
Ok(0) => {
debug!(debug_shell_logger, "master fd closed");
tx.send(1).unwrap();
break;
}
Ok(_) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
error!(debug_shell_logger, "read master fd error {:?}", e);
tx.send(1).unwrap();
break;
}
}
}
if fd_set.contains(socket_fd) {
match io_copy(&mut socket_reader, &mut master_writer) {
Ok(0) => {
debug!(debug_shell_logger, "socket fd closed");
tx.send(1).unwrap();
break;
}
Ok(_) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
error!(debug_shell_logger, "read socket fd error {:?}", e);
tx.send(1).unwrap();
break;
}
}
}
}
});
let wait_status = wait::waitpid(child_pid, None);
info!(logger, "debug console process exit code: {:?}", wait_status);
info!(logger, "notify debug monitor thread to exit");
// close pipe to exit select loop
let _ = close(wfd);
// wait for thread exit.
let _ = rx.recv().unwrap();
info!(logger, "debug monitor thread has exited");
// close files
let _ = close(rfd);
let _ = close(master_fd);
let _ = close(slave_fd);
}
Err(err) => {
return Err(anyhow!("fork error: {:?}", err));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_setup_debug_console_no_shells() {
// Guarantee no shells have been added
// (required to avoid racing with
// test_setup_debug_console_invalid_shell()).
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
shells.clear();
let logger = slog_scope::logger();
let result = setup_debug_console(&logger, shells.to_vec(), 0);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
#[test]
fn test_setup_debug_console_invalid_shell() {
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
let dir = tempdir().expect("failed to create tmpdir");
// Add an invalid shell
let shell = dir
.path()
.join("enoent")
.to_str()
.expect("failed to construct shell path")
.to_string();
shells.push(shell);
let logger = slog_scope::logger();
let result = setup_debug_console(&logger, shells.to_vec(), 0);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
}