Merge pull request #1648 from Tim-Zhang/rework-debug-console

agent: Rework the debug console
This commit is contained in:
Fupan Li 2021-04-08 23:14:52 +08:00 committed by GitHub
commit 521887db16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 322 additions and 340 deletions

294
src/agent/src/console.rs Normal file
View File

@ -0,0 +1,294 @@
// Copyright (c) 2021 Ant Group
// Copyright (c) 2021 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
use crate::util;
use anyhow::{anyhow, Result};
use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use nix::pty::{openpty, OpenptyResult};
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::sys::stat::Mode;
use nix::sys::wait;
use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid};
use rustjail::pipestream::PipeStream;
use slog::Logger;
use std::ffi::CString;
use std::os::unix::io::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex as SyncMutex;
use futures::StreamExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::select;
use tokio::sync::watch::Receiver;
const CONSOLE_PATH: &str = "/dev/console";
lazy_static! {
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
let mut v = Vec::new();
if !cfg!(test) {
v.push("/bin/bash".to_string());
v.push("/bin/sh".to_string());
}
Arc::new(SyncMutex::new(v))
};
}
pub fn initialize() {
lazy_static::initialize(&SHELLS);
}
pub async fn debug_console_handler(
logger: Logger,
port: u32,
mut shutdown: Receiver<bool>,
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "debug-console"));
let shells = SHELLS.lock().unwrap().to_vec();
let shell = shells
.into_iter()
.find(|sh| PathBuf::from(sh).exists())
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
if port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)?;
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port);
socket::bind(listenfd, &addr)?;
socket::listen(listenfd, 1)?;
let mut incoming = util::get_vsock_incoming(listenfd);
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "debug console got shutdown request");
break;
}
conn = incoming.next() => {
if let Some(conn) = conn {
// Accept a new connection
match conn {
Ok(stream) => {
let logger = logger.clone();
let shell = shell.clone();
// Do not block(await) here, or we'll never receive the shutdown signal
tokio::spawn(async move {
let _ = run_debug_console_vsock(logger, shell, stream).await;
});
}
Err(e) => {
error!(logger, "{:?}", e);
}
}
} else {
break;
}
}
}
}
} else {
let mut flags = OFlag::empty();
flags.insert(OFlag::O_RDWR);
flags.insert(OFlag::O_CLOEXEC);
let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
select! {
_ = shutdown.changed() => {
info!(logger, "debug console got shutdown request");
}
result = run_debug_console_serial(shell.clone(), fd) => {
match result {
Ok(_) => {
info!(logger, "run_debug_console_shell session finished");
}
Err(err) => {
error!(logger, "run_debug_console_shell failed: {:?}", err);
}
}
}
}
};
Ok(())
}
fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> {
// create new session with child as session leader
setsid()?;
// dup stdin, stdout, stderr to let child act as a terminal
dup2(slave_fd, STDIN_FILENO)?;
dup2(slave_fd, STDOUT_FILENO)?;
dup2(slave_fd, STDERR_FILENO)?;
// set tty
unsafe {
libc::ioctl(0, libc::TIOCSCTTY);
}
let cmd = CString::new(shell).unwrap();
// run shell
let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e {
nix::Error::Sys(errno) => {
std::process::exit(errno as i32);
}
_ => std::process::exit(-2),
});
Ok(())
}
async fn run_in_parent<T: AsyncRead + AsyncWrite>(
logger: Logger,
stream: T,
pseudo: OpenptyResult,
child_pid: Pid,
) -> Result<()> {
info!(logger, "get debug shell pid {:?}", child_pid);
let master_fd = pseudo.master;
let _ = close(pseudo.slave);
let (mut socket_reader, mut socket_writer) = tokio::io::split(stream);
let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd));
select! {
res = tokio::io::copy(&mut master_reader, &mut socket_writer) => {
debug!(
logger,
"master closed: {:?}", res
);
}
res = tokio::io::copy(&mut socket_reader, &mut master_writer) => {
info!(
logger,
"socket closed: {:?}", res
);
}
}
let wait_status = wait::waitpid(child_pid, None);
info!(logger, "debug console process exit code: {:?}", wait_status);
Ok(())
}
async fn run_debug_console_vsock<T: AsyncRead + AsyncWrite>(
logger: Logger,
shell: String,
stream: T,
) -> Result<()> {
let logger = logger.new(o!("subsystem" => "debug-console-shell"));
let pseudo = openpty(None, None)?;
let _ = fcntl::fcntl(pseudo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let _ = fcntl::fcntl(pseudo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let slave_fd = pseudo.slave;
match fork() {
Ok(ForkResult::Child) => run_in_child(slave_fd, shell),
Ok(ForkResult::Parent { child: child_pid }) => {
run_in_parent(logger.clone(), stream, pseudo, child_pid).await
}
Err(err) => Err(anyhow!("fork error: {:?}", err)),
}
}
async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> {
let mut child = match tokio::process::Command::new(shell)
.arg("-i")
.kill_on_drop(true)
.stdin(unsafe { Stdio::from_raw_fd(fd) })
.stdout(unsafe { Stdio::from_raw_fd(fd) })
.stderr(unsafe { Stdio::from_raw_fd(fd) })
.spawn()
{
Ok(c) => c,
Err(_) => return Err(anyhow!("failed to spawn shell")),
};
child.wait().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use tokio::sync::watch;
#[tokio::test]
async fn test_setup_debug_console_no_shells() {
{
// Guarantee no shells have been added
// (required to avoid racing with
// test_setup_debug_console_invalid_shell()).
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
shells.clear();
}
let logger = slog_scope::logger();
let (_, rx) = watch::channel(true);
let result = debug_console_handler(logger, 0, rx).await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
#[tokio::test]
async fn test_setup_debug_console_invalid_shell() {
{
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
let dir = tempdir().expect("failed to create tmpdir");
// Add an invalid shell
let shell = dir
.path()
.join("enoent")
.to_str()
.expect("failed to construct shell path")
.to_string();
shells.push(shell);
}
let logger = slog_scope::logger();
let (_, rx) = watch::channel(true);
let result = debug_console_handler(logger, 0, rx).await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
}

View File

@ -20,26 +20,21 @@ extern crate scopeguard;
extern crate slog;
use anyhow::{anyhow, Context, Result};
use nix::fcntl::{self, OFlag};
use nix::fcntl::{FcntlArg, FdFlag};
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
use nix::pty;
use nix::sys::select::{select, FdSet};
use nix::fcntl::OFlag;
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::sys::wait;
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
use nix::unistd::{self, dup, Pid};
use std::env;
use std::ffi::{CStr, CString, OsStr};
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::{Read, Write};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs as unixfs;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::process::exit;
use std::sync::Arc;
use unistd::Pid;
mod config;
mod console;
mod device;
mod linux_abi;
mod metrics;
@ -63,10 +58,7 @@ use signal::setup_signal_handler;
use slog::Logger;
use uevent::watch_uevents;
use std::sync::Mutex as SyncMutex;
use futures::future::join_all;
use futures::StreamExt as _;
use rustjail::pipestream::PipeStream;
use tokio::{
io::AsyncWrite,
@ -76,15 +68,11 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_vsock::{Incoming, VsockListener, VsockStream};
mod rpc;
const NAME: &str = "kata-agent";
const KERNEL_CMDLINE_FILE: &str = "/proc/cmdline";
const CONSOLE_PATH: &str = "/dev/console";
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
lazy_static! {
static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> =
@ -104,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) {
);
}
fn set_fd_close_exec(fd: RawFd) -> Result<RawFd> {
if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) {
return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e));
}
Ok(fd)
}
fn get_vsock_incoming(fd: RawFd) -> Incoming {
let incoming;
unsafe {
incoming = VsockListener::from_raw_fd(fd).incoming();
}
incoming
}
async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap();
set_fd_close_exec(stream.as_raw_fd())?;
Ok(stream)
}
// Create a thread to handle reading from the logger pipe. The thread will
// output to the vsock port specified, or stdout.
async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool>) -> Result<()> {
@ -143,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
writer = Box::new(get_vsock_stream(listenfd).await.unwrap());
writer = Box::new(util::get_vsock_stream(listenfd).await.unwrap());
} else {
writer = Box::new(tokio::io::stdout());
}
@ -159,7 +126,7 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// List of tasks that need to be stopped for a clean shutdown
let mut tasks: Vec<JoinHandle<Result<()>>> = vec![];
lazy_static::initialize(&SHELLS);
console::initialize();
lazy_static::initialize(&AGENT_CONFIG);
@ -299,26 +266,17 @@ async fn start_sandbox(
tasks: &mut Vec<JoinHandle<Result<()>>>,
shutdown: Receiver<bool>,
) -> Result<()> {
let shells = SHELLS.clone();
let debug_console_vport = config.debug_console_vport as u32;
let shell_handle = if config.debug_console {
let thread_logger = logger.clone();
let shells = shells.lock().unwrap().to_vec();
if config.debug_console {
let debug_console_task = tokio::task::spawn(console::debug_console_handler(
logger.clone(),
debug_console_vport,
shutdown.clone(),
));
let handle = tokio::task::spawn_blocking(move || {
let result = setup_debug_console(&thread_logger, shells, debug_console_vport);
if result.is_err() {
// Report error, but don't fail
warn!(thread_logger, "failed to setup debug console";
"error" => format!("{}", result.unwrap_err()));
}
});
Some(handle)
} else {
None
};
tasks.push(debug_console_task);
}
// Initialize unique sandbox structure.
let s = Sandbox::new(&logger).context("Failed to create sandbox")?;
@ -350,10 +308,6 @@ async fn start_sandbox(
let _ = rx.await?;
server.shutdown().await?;
if let Some(handle) = shell_handle {
handle.await.map_err(|e| anyhow!("{:?}", e))?;
}
Ok(())
}
@ -404,284 +358,5 @@ fn sethostname(hostname: &OsStr) -> Result<()> {
}
}
lazy_static! {
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
let mut v = Vec::new();
if !cfg!(test) {
v.push("/bin/bash".to_string());
v.push("/bin/sh".to_string());
}
Arc::new(SyncMutex::new(v))
};
}
use crate::config::AgentConfig;
use nix::sys::stat::Mode;
use std::os::unix::io::{FromRawFd, RawFd};
use std::path::PathBuf;
use std::process::exit;
fn setup_debug_console(logger: &Logger, shells: Vec<String>, port: u32) -> Result<()> {
let shell = shells
.iter()
.find(|sh| PathBuf::from(sh).exists())
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
if port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)?;
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, port);
socket::bind(listenfd, &addr)?;
socket::listen(listenfd, 1)?;
loop {
let f: RawFd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?;
match run_debug_console_shell(logger, shell, f) {
Ok(_) => {
info!(logger, "run_debug_console_shell session finished");
}
Err(err) => {
error!(logger, "run_debug_console_shell failed: {:?}", err);
}
}
}
} else {
let mut flags = OFlag::empty();
flags.insert(OFlag::O_RDWR);
flags.insert(OFlag::O_CLOEXEC);
loop {
let f: RawFd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
match run_debug_console_shell(logger, shell, f) {
Ok(_) => {
info!(logger, "run_debug_console_shell session finished");
}
Err(err) => {
error!(logger, "run_debug_console_shell failed: {:?}", err);
}
}
}
};
}
fn io_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
where
R: Read,
W: Write,
{
let mut buf = [0; DEFAULT_BUF_SIZE];
let buf_len;
match reader.read(&mut buf) {
Ok(0) => return Ok(0),
Ok(len) => buf_len = len,
Err(err) => return Err(err),
};
// write and return
match writer.write_all(&buf[..buf_len]) {
Ok(_) => Ok(buf_len as u64),
Err(err) => Err(err),
}
}
fn run_debug_console_shell(logger: &Logger, shell: &str, socket_fd: RawFd) -> Result<()> {
let pseduo = pty::openpty(None, None)?;
let _ = fcntl::fcntl(pseduo.master, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let _ = fcntl::fcntl(pseduo.slave, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC));
let slave_fd = pseduo.slave;
match fork() {
Ok(ForkResult::Child) => {
// create new session with child as session leader
setsid()?;
// dup stdin, stdout, stderr to let child act as a terminal
dup2(slave_fd, STDIN_FILENO)?;
dup2(slave_fd, STDOUT_FILENO)?;
dup2(slave_fd, STDERR_FILENO)?;
// set tty
unsafe {
libc::ioctl(0, libc::TIOCSCTTY);
}
let cmd = CString::new(shell).unwrap();
let args: Vec<&CStr> = vec![];
// run shell
let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e {
nix::Error::Sys(errno) => {
std::process::exit(errno as i32);
}
_ => std::process::exit(-2),
});
}
Ok(ForkResult::Parent { child: child_pid }) => {
info!(logger, "get debug shell pid {:?}", child_pid);
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let master_fd = pseduo.master;
let debug_shell_logger = logger.clone();
// channel that used to sync between thread and main process
let (tx, rx) = std::sync::mpsc::channel::<i32>();
// start a thread to do IO copy between socket and pseduo.master
std::thread::spawn(move || {
let mut master_reader = unsafe { File::from_raw_fd(master_fd) };
let mut master_writer = unsafe { File::from_raw_fd(master_fd) };
let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) };
let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) };
loop {
let mut fd_set = FdSet::new();
fd_set.insert(rfd);
fd_set.insert(master_fd);
fd_set.insert(socket_fd);
match select(
Some(fd_set.highest().unwrap() + 1),
&mut fd_set,
None,
None,
None,
) {
Ok(_) => (),
Err(e) => {
if e == nix::Error::from(nix::errno::Errno::EINTR) {
continue;
} else {
error!(debug_shell_logger, "select error {:?}", e);
tx.send(1).unwrap();
break;
}
}
}
if fd_set.contains(rfd) {
info!(
debug_shell_logger,
"debug shell process {} exited", child_pid
);
tx.send(1).unwrap();
break;
}
if fd_set.contains(master_fd) {
match io_copy(&mut master_reader, &mut socket_writer) {
Ok(0) => {
debug!(debug_shell_logger, "master fd closed");
tx.send(1).unwrap();
break;
}
Ok(_) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
error!(debug_shell_logger, "read master fd error {:?}", e);
tx.send(1).unwrap();
break;
}
}
}
if fd_set.contains(socket_fd) {
match io_copy(&mut socket_reader, &mut master_writer) {
Ok(0) => {
debug!(debug_shell_logger, "socket fd closed");
tx.send(1).unwrap();
break;
}
Ok(_) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => {
error!(debug_shell_logger, "read socket fd error {:?}", e);
tx.send(1).unwrap();
break;
}
}
}
}
});
let wait_status = wait::waitpid(child_pid, None);
info!(logger, "debug console process exit code: {:?}", wait_status);
info!(logger, "notify debug monitor thread to exit");
// close pipe to exit select loop
let _ = close(wfd);
// wait for thread exit.
let _ = rx.recv().unwrap();
info!(logger, "debug monitor thread has exited");
// close files
let _ = close(rfd);
let _ = close(master_fd);
let _ = close(slave_fd);
}
Err(err) => {
return Err(anyhow!("fork error: {:?}", err));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_setup_debug_console_no_shells() {
// Guarantee no shells have been added
// (required to avoid racing with
// test_setup_debug_console_invalid_shell()).
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
shells.clear();
let logger = slog_scope::logger();
let result = setup_debug_console(&logger, shells.to_vec(), 0);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
#[test]
fn test_setup_debug_console_invalid_shell() {
let shells_ref = SHELLS.clone();
let mut shells = shells_ref.lock().unwrap();
let dir = tempdir().expect("failed to create tmpdir");
// Add an invalid shell
let shell = dir
.path()
.join("enoent")
.to_str()
.expect("failed to construct shell path")
.to_string();
shells.push(shell);
let logger = slog_scope::logger();
let result = setup_debug_console(&logger, shells.to_vec(), 0);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"no shell found to launch debug console"
);
}
}

View File

@ -3,10 +3,14 @@
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use futures::StreamExt;
use std::io;
use std::io::ErrorKind;
use std::os::unix::io::{FromRawFd, RawFd};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch::Receiver;
use tokio_vsock::{Incoming, VsockListener, VsockStream};
// Size of I/O read buffer
const BUF_SIZE: usize = 8192;
@ -52,6 +56,15 @@ where
Ok(total_bytes)
}
pub fn get_vsock_incoming(fd: RawFd) -> Incoming {
unsafe { VsockListener::from_raw_fd(fd).incoming() }
}
pub async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
let stream = get_vsock_incoming(fd).next().await.unwrap()?;
Ok(stream)
}
#[cfg(test)]
mod tests {
use super::*;