diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 0aa027e38b..ad8cc65656 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -68,6 +68,7 @@ use futures::future::join_all; use futures::StreamExt as _; use rustjail::pipestream::PipeStream; use tokio::{ + io::AsyncWrite, signal::unix::{signal, SignalKind}, sync::{oneshot::Sender, Mutex, RwLock}, task::JoinHandle, @@ -123,11 +124,39 @@ async fn get_vsock_stream(fd: RawFd) -> Result { Ok(stream) } +// Create a thread to handle reading from the logger pipe. The thread will +// output to the vsock port specified, or stdout. +async fn create_logger_task(rfd: RawFd, vsock_port: u32) -> Result<()> { + let mut reader = PipeStream::from_fd(rfd); + let mut writer: Box; + + if vsock_port > 0 { + let listenfd = socket::socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::SOCK_CLOEXEC, + None, + )?; + + let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, vsock_port); + socket::bind(listenfd, &addr).unwrap(); + socket::listen(listenfd, 1).unwrap(); + + writer = Box::new(get_vsock_stream(listenfd).await.unwrap()); + } else { + writer = Box::new(tokio::io::stdout()); + } + + let _ = tokio::io::copy(&mut reader, &mut writer).await; + + Ok(()) +} + async fn real_main() -> std::result::Result<(), Box> { env::set_var("RUST_BACKTRACE", "full"); // List of tasks that need to be stopped for a clean shutdown - let mut tasks: Vec> = vec![]; + let mut tasks: Vec>> = vec![]; lazy_static::initialize(&SHELLS); @@ -173,34 +202,8 @@ async fn real_main() -> std::result::Result<(), Box> { let config = agent_config.read().await; let log_vport = config.log_vport as u32; - let log_handle = tokio::spawn(async move { - let mut reader = PipeStream::from_fd(rfd); - if log_vport > 0 { - let listenfd = socket::socket( - AddressFamily::Vsock, - SockType::Stream, - SockFlag::SOCK_CLOEXEC, - None, - ) - .unwrap(); - - let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, log_vport); - socket::bind(listenfd, &addr).unwrap(); - socket::listen(listenfd, 1).unwrap(); - - let mut vsock_stream = get_vsock_stream(listenfd).await.unwrap(); - - // copy log to stdout - tokio::io::copy(&mut reader, &mut vsock_stream) - .await - .unwrap(); - } - - // copy log to stdout - let mut stdout_writer = tokio::io::stdout(); - let _ = tokio::io::copy(&mut reader, &mut stdout_writer).await; - }); + let log_handle = tokio::spawn(create_logger_task(rfd, log_vport)); tasks.push(log_handle);