main: Create logger task

Encapsulate the logic for handling the task that displays logger output
into a new function to simplify the code and remove another anonymous
async block.

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2021-02-25 09:17:23 +00:00
parent 2cf2897d31
commit dcb39c61f1

View File

@ -68,6 +68,7 @@ use futures::future::join_all;
use futures::StreamExt as _;
use rustjail::pipestream::PipeStream;
use tokio::{
io::AsyncWrite,
signal::unix::{signal, SignalKind},
sync::{oneshot::Sender, Mutex, RwLock},
task::JoinHandle,
@ -123,11 +124,39 @@ async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
Ok(stream)
}
// Create a thread to handle reading from the logger pipe. The thread will
// output to the vsock port specified, or stdout.
async fn create_logger_task(rfd: RawFd, vsock_port: u32) -> Result<()> {
let mut reader = PipeStream::from_fd(rfd);
let mut writer: Box<dyn AsyncWrite + Unpin + Send>;
if vsock_port > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)?;
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, vsock_port);
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
writer = Box::new(get_vsock_stream(listenfd).await.unwrap());
} else {
writer = Box::new(tokio::io::stdout());
}
let _ = tokio::io::copy(&mut reader, &mut writer).await;
Ok(())
}
async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
env::set_var("RUST_BACKTRACE", "full");
// List of tasks that need to be stopped for a clean shutdown
let mut tasks: Vec<JoinHandle<()>> = vec![];
let mut tasks: Vec<JoinHandle<Result<()>>> = vec![];
lazy_static::initialize(&SHELLS);
@ -173,34 +202,8 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let config = agent_config.read().await;
let log_vport = config.log_vport as u32;
let log_handle = tokio::spawn(async move {
let mut reader = PipeStream::from_fd(rfd);
if log_vport > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)
.unwrap();
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, log_vport);
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
let mut vsock_stream = get_vsock_stream(listenfd).await.unwrap();
// copy log to stdout
tokio::io::copy(&mut reader, &mut vsock_stream)
.await
.unwrap();
}
// copy log to stdout
let mut stdout_writer = tokio::io::stdout();
let _ = tokio::io::copy(&mut reader, &mut stdout_writer).await;
});
let log_handle = tokio::spawn(create_logger_task(rfd, log_vport));
tasks.push(log_handle);