mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
agent: async the debug console
Make the debug console in this commit. Finish the rework of debug console. Fixes: #1647 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
parent
9017e1100b
commit
790332575b
@ -4,10 +4,9 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use crate::util;
|
||||
use anyhow::{anyhow, Result};
|
||||
use lazy_static;
|
||||
use nix::fcntl::{self, OFlag};
|
||||
use nix::fcntl::{FcntlArg, FdFlag};
|
||||
use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
|
||||
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
|
||||
use nix::pty::{openpty, OpenptyResult};
|
||||
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
||||
@ -16,18 +15,19 @@ use nix::sys::wait;
|
||||
use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid};
|
||||
use rustjail::pipestream::PipeStream;
|
||||
use slog::Logger;
|
||||
use std::ffi::{CStr, CString};
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Write};
|
||||
use std::ffi::CString;
|
||||
use std::os::unix::io::{FromRawFd, RawFd};
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as SyncMutex;
|
||||
|
||||
use futures::StreamExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::select;
|
||||
use tokio::sync::watch::Receiver;
|
||||
use tokio::{pin, select};
|
||||
|
||||
const CONSOLE_PATH: &str = "/dev/console";
|
||||
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
|
||||
|
||||
lazy_static! {
|
||||
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
|
||||
@ -53,16 +53,13 @@ pub async fn debug_console_handler(
|
||||
) -> Result<()> {
|
||||
let logger = logger.new(o!("subsystem" => "debug-console"));
|
||||
|
||||
let shells = SHELLS.clone();
|
||||
let shells = shells.lock().unwrap().to_vec();
|
||||
let shells = SHELLS.lock().unwrap().to_vec();
|
||||
|
||||
let shell = shells
|
||||
.iter()
|
||||
.into_iter()
|
||||
.find(|sh| PathBuf::from(sh).exists())
|
||||
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
|
||||
|
||||
let fd: RawFd;
|
||||
|
||||
if port > 0 {
|
||||
let listenfd = socket::socket(
|
||||
AddressFamily::Vsock,
|
||||
@ -74,40 +71,66 @@ pub async fn debug_console_handler(
|
||||
socket::bind(listenfd, &addr)?;
|
||||
socket::listen(listenfd, 1)?;
|
||||
|
||||
fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?;
|
||||
let mut incoming = util::get_vsock_incoming(listenfd);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
_ = shutdown.changed() => {
|
||||
info!(logger, "debug console got shutdown request");
|
||||
break;
|
||||
}
|
||||
|
||||
conn = incoming.next() => {
|
||||
if let Some(conn) = conn {
|
||||
// Accept a new connection
|
||||
match conn {
|
||||
Ok(stream) => {
|
||||
let logger = logger.clone();
|
||||
let shell = shell.clone();
|
||||
// Do not block(await) here, or we'll never receive the shutdown signal
|
||||
tokio::spawn(async move {
|
||||
let _ = run_debug_console_vsock(logger, shell, stream).await;
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!(logger, "{:?}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut flags = OFlag::empty();
|
||||
flags.insert(OFlag::O_RDWR);
|
||||
flags.insert(OFlag::O_CLOEXEC);
|
||||
|
||||
fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
|
||||
};
|
||||
let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
|
||||
|
||||
loop {
|
||||
select! {
|
||||
_ = shutdown.changed() => {
|
||||
info!(logger, "got shutdown request");
|
||||
break;
|
||||
info!(logger, "debug console got shutdown request");
|
||||
}
|
||||
|
||||
// BUG: FIXME: wait on parent.
|
||||
//result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => {
|
||||
// match result {
|
||||
// Ok(_) => {
|
||||
// info!(logger, "run_debug_console_shell session finished");
|
||||
// }
|
||||
// Err(err) => {
|
||||
// error!(logger, "run_debug_console_shell failed: {:?}", err);
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
result = run_debug_console_serial(shell.clone(), fd) => {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
info!(logger, "run_debug_console_shell session finished");
|
||||
}
|
||||
Err(err) => {
|
||||
error!(logger, "run_debug_console_shell failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
|
||||
fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> {
|
||||
// create new session with child as session leader
|
||||
setsid()?;
|
||||
|
||||
@ -122,10 +145,9 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
|
||||
}
|
||||
|
||||
let cmd = CString::new(shell).unwrap();
|
||||
let args: Vec<&CStr> = vec![];
|
||||
|
||||
// run shell
|
||||
let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e {
|
||||
let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e {
|
||||
nix::Error::Sys(errno) => {
|
||||
std::process::exit(errno as i32);
|
||||
}
|
||||
@ -135,122 +157,45 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_in_parent(
|
||||
async fn run_in_parent<T: AsyncRead + AsyncWrite>(
|
||||
logger: Logger,
|
||||
mut shutdown: Receiver<bool>,
|
||||
socket_fd: RawFd,
|
||||
stream: T,
|
||||
pseudo: OpenptyResult,
|
||||
child_pid: Pid,
|
||||
) -> Result<()> {
|
||||
info!(logger, "get debug shell pid {:?}", child_pid);
|
||||
|
||||
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
|
||||
let master_fd = pseudo.master;
|
||||
let slave_fd = pseudo.slave;
|
||||
//let debug_shell_logger = logger.clone();
|
||||
let _ = close(pseudo.slave);
|
||||
|
||||
let logger = logger.clone();
|
||||
|
||||
// channel that used to sync between thread and main process
|
||||
let (tx, rx) = std::sync::mpsc::channel::<i32>();
|
||||
|
||||
// start a thread to do IO copy between socket and pseudo.master
|
||||
//tokio::spawn(async move {
|
||||
//let logger = logger.clone();
|
||||
//let mut shutdown = shutdown.clone();
|
||||
|
||||
//let mut master_reader = unsafe { File::from_raw_fd(master_fd) };
|
||||
//let mut master_writer = unsafe { File::from_raw_fd(master_fd) };
|
||||
//let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) };
|
||||
//let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) };
|
||||
|
||||
let mut pipe_reader = PipeStream::from_fd(rfd);
|
||||
|
||||
//let mut pty_master_reader = PipeStream::from_fd(master_fd);
|
||||
//let mut socket_reader = PipeStream::from_fd(socket_fd);
|
||||
|
||||
//pin!(pipe_reader);
|
||||
|
||||
// BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd
|
||||
// (see commented out code below).
|
||||
loop {
|
||||
select! {
|
||||
_ = shutdown.changed() => {
|
||||
info!(logger, "got shutdown request");
|
||||
break;
|
||||
},
|
||||
_ = pipe_reader => {
|
||||
info!(
|
||||
debug_shell_logger,
|
||||
"debug shell process {} exited", child_pid
|
||||
);
|
||||
tx.send(1).unwrap();
|
||||
},
|
||||
let (mut socket_reader, mut socket_writer) = tokio::io::split(stream);
|
||||
let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd));
|
||||
|
||||
select! {
|
||||
res = tokio::io::copy(&mut master_reader, &mut socket_writer) => {
|
||||
debug!(
|
||||
logger,
|
||||
"master closed: {:?}", res
|
||||
);
|
||||
}
|
||||
res = tokio::io::copy(&mut socket_reader, &mut master_writer) => {
|
||||
info!(
|
||||
logger,
|
||||
"socket closed: {:?}", res
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// if fd_set.contains(master_fd) {
|
||||
// match io_copy(&mut master_reader, &mut socket_writer) {
|
||||
// Ok(0) => {
|
||||
// debug!(debug_shell_logger, "master fd closed");
|
||||
// tx.send(1).unwrap();
|
||||
// break;
|
||||
// }
|
||||
// Ok(_) => {}
|
||||
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
||||
// Err(e) => {
|
||||
// error!(debug_shell_logger, "read master fd error {:?}", e);
|
||||
// tx.send(1).unwrap();
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// if fd_set.contains(socket_fd) {
|
||||
// match io_copy(&mut socket_reader, &mut master_writer) {
|
||||
// Ok(0) => {
|
||||
// debug!(debug_shell_logger, "socket fd closed");
|
||||
// tx.send(1).unwrap();
|
||||
// break;
|
||||
// }
|
||||
// Ok(_) => {}
|
||||
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
||||
// Err(e) => {
|
||||
// error!(debug_shell_logger, "read socket fd error {:?}", e);
|
||||
// tx.send(1).unwrap();
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//})
|
||||
//.await;
|
||||
|
||||
let wait_status = wait::waitpid(child_pid, None);
|
||||
info!(logger, "debug console process exit code: {:?}", wait_status);
|
||||
|
||||
info!(logger, "notify debug monitor thread to exit");
|
||||
// close pipe to exit select loop
|
||||
let _ = close(wfd);
|
||||
|
||||
// wait for thread exit.
|
||||
let _ = rx.recv().unwrap();
|
||||
info!(logger, "debug monitor thread has exited");
|
||||
|
||||
// close files
|
||||
let _ = close(rfd);
|
||||
let _ = close(master_fd);
|
||||
let _ = close(slave_fd);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_debug_console_shell(
|
||||
async fn run_debug_console_vsock<T: AsyncRead + AsyncWrite>(
|
||||
logger: Logger,
|
||||
shell: &str,
|
||||
socket_fd: RawFd,
|
||||
shutdown: Receiver<bool>,
|
||||
shell: String,
|
||||
stream: T,
|
||||
) -> Result<()> {
|
||||
let logger = logger.new(o!("subsystem" => "debug-console-shell"));
|
||||
|
||||
@ -263,19 +208,30 @@ async fn run_debug_console_shell(
|
||||
match fork() {
|
||||
Ok(ForkResult::Child) => run_in_child(slave_fd, shell),
|
||||
Ok(ForkResult::Parent { child: child_pid }) => {
|
||||
run_in_parent(
|
||||
logger.clone(),
|
||||
shutdown.clone(),
|
||||
socket_fd,
|
||||
pseudo,
|
||||
child_pid,
|
||||
)
|
||||
.await
|
||||
run_in_parent(logger.clone(), stream, pseudo, child_pid).await
|
||||
}
|
||||
Err(err) => Err(anyhow!("fork error: {:?}", err)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> {
|
||||
let mut child = match tokio::process::Command::new(shell)
|
||||
.arg("-i")
|
||||
.kill_on_drop(true)
|
||||
.stdin(unsafe { Stdio::from_raw_fd(fd) })
|
||||
.stdout(unsafe { Stdio::from_raw_fd(fd) })
|
||||
.stderr(unsafe { Stdio::from_raw_fd(fd) })
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(anyhow!("failed to spawn shell")),
|
||||
};
|
||||
|
||||
child.wait().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// BUG: FIXME:
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@ -328,26 +284,3 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// BUG: FIXME: should not be required as we can use the
|
||||
// interruptable_io_copier(). But if it is still needed, move to utils.rs.
|
||||
fn io_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
|
||||
where
|
||||
R: Read,
|
||||
W: Write,
|
||||
{
|
||||
let mut buf = [0; DEFAULT_BUF_SIZE];
|
||||
let buf_len;
|
||||
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => return Ok(0),
|
||||
Ok(len) => buf_len = len,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
// write and return
|
||||
match writer.write_all(&buf[..buf_len]) {
|
||||
Ok(_) => Ok(buf_len as u64),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
@ -20,11 +20,9 @@ extern crate scopeguard;
|
||||
extern crate slog;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use nix::fcntl::{self, OFlag};
|
||||
use nix::fcntl::{FcntlArg, FdFlag};
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
||||
use nix::unistd::{self, dup, Pid};
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::{self, File};
|
||||
@ -61,7 +59,6 @@ use slog::Logger;
|
||||
use uevent::watch_uevents;
|
||||
|
||||
use futures::future::join_all;
|
||||
use futures::StreamExt as _;
|
||||
use rustjail::pipestream::PipeStream;
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
@ -71,7 +68,6 @@ use tokio::{
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_vsock::{Incoming, VsockListener, VsockStream};
|
||||
|
||||
mod rpc;
|
||||
|
||||
@ -96,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) {
|
||||
);
|
||||
}
|
||||
|
||||
fn set_fd_close_exec(fd: RawFd) -> Result<RawFd> {
|
||||
if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) {
|
||||
return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e));
|
||||
}
|
||||
Ok(fd)
|
||||
}
|
||||
|
||||
fn get_vsock_incoming(fd: RawFd) -> Incoming {
|
||||
let incoming;
|
||||
unsafe {
|
||||
incoming = VsockListener::from_raw_fd(fd).incoming();
|
||||
}
|
||||
incoming
|
||||
}
|
||||
|
||||
async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
|
||||
let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap();
|
||||
set_fd_close_exec(stream.as_raw_fd())?;
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
// Create a thread to handle reading from the logger pipe. The thread will
|
||||
// output to the vsock port specified, or stdout.
|
||||
async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool>) -> Result<()> {
|
||||
@ -135,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool
|
||||
socket::bind(listenfd, &addr).unwrap();
|
||||
socket::listen(listenfd, 1).unwrap();
|
||||
|
||||
writer = Box::new(get_vsock_stream(listenfd).await.unwrap());
|
||||
writer = Box::new(util::get_vsock_stream(listenfd).await.unwrap());
|
||||
} else {
|
||||
writer = Box::new(tokio::io::stdout());
|
||||
}
|
||||
|
@ -3,10 +3,14 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use anyhow::Result;
|
||||
use futures::StreamExt;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use std::os::unix::io::{FromRawFd, RawFd};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::sync::watch::Receiver;
|
||||
use tokio_vsock::{Incoming, VsockListener, VsockStream};
|
||||
|
||||
// Size of I/O read buffer
|
||||
const BUF_SIZE: usize = 8192;
|
||||
@ -52,6 +56,15 @@ where
|
||||
Ok(total_bytes)
|
||||
}
|
||||
|
||||
pub fn get_vsock_incoming(fd: RawFd) -> Incoming {
|
||||
unsafe { VsockListener::from_raw_fd(fd).incoming() }
|
||||
}
|
||||
|
||||
pub async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
|
||||
let stream = get_vsock_incoming(fd).next().await.unwrap()?;
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
Loading…
Reference in New Issue
Block a user