From 039df1d727d0d0c0c87a675a151964a94ad4c5ef Mon Sep 17 00:00:00 2001 From: "James O. D. Hunt" Date: Mon, 22 Feb 2021 11:23:28 +0000 Subject: [PATCH] main: Refactor main logic into new async function Move most of the main logic into a separate async function. This makes the code clearer and avoids the anonymous async block. Signed-off-by: James O. D. Hunt --- src/agent/src/main.rs | 204 +++++++++++++++++++++--------------------- 1 file changed, 103 insertions(+), 101 deletions(-) diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index b9fb5e8b8f..efac66d5cf 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -121,6 +121,108 @@ async fn get_vsock_stream(fd: RawFd) -> Result { Ok(stream) } +async fn real_main() -> std::result::Result<(), Box> { + env::set_var("RUST_BACKTRACE", "full"); + + lazy_static::initialize(&SHELLS); + + lazy_static::initialize(&AGENT_CONFIG); + + // support vsock log + let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; + + let agent_config = AGENT_CONFIG.clone(); + + let init_mode = unistd::getpid() == Pid::from_raw(1); + if init_mode { + // dup a new file descriptor for this temporary logger writer, + // since this logger would be dropped and it's writer would + // be closed out of this code block. + let newwfd = dup(wfd)?; + let writer = unsafe { File::from_raw_fd(newwfd) }; + + // Init a temporary logger used by init agent as init process + // since before do the base mount, it wouldn't access "/proc/cmdline" + // to get the customzied debug level. + let (logger, logger_async_guard) = + logging::create_logger(NAME, "agent", slog::Level::Debug, writer); + + // Must mount proc fs before parsing kernel command line + general_mount(&logger).map_err(|e| { + error!(logger, "fail general mount: {}", e); + e + })?; + + let mut config = agent_config.write().await; + config.parse_cmdline(KERNEL_CMDLINE_FILE)?; + + init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?; + drop(logger_async_guard); + } else { + // once parsed cmdline and set the config, release the write lock + // as soon as possible in case other thread would get read lock on + // it. + let mut config = agent_config.write().await; + config.parse_cmdline(KERNEL_CMDLINE_FILE)?; + } + let config = agent_config.read().await; + + let log_vport = config.log_vport as u32; + let log_handle = tokio::spawn(async move { + let mut reader = PipeStream::from_fd(rfd); + + if log_vport > 0 { + let listenfd = socket::socket( + AddressFamily::Vsock, + SockType::Stream, + SockFlag::SOCK_CLOEXEC, + None, + ) + .unwrap(); + + let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, log_vport); + socket::bind(listenfd, &addr).unwrap(); + socket::listen(listenfd, 1).unwrap(); + + let mut vsock_stream = get_vsock_stream(listenfd).await.unwrap(); + + // copy log to stdout + tokio::io::copy(&mut reader, &mut vsock_stream) + .await + .unwrap(); + } + + // copy log to stdout + let mut stdout_writer = tokio::io::stdout(); + let _ = tokio::io::copy(&mut reader, &mut stdout_writer).await; + }); + + let writer = unsafe { File::from_raw_fd(wfd) }; + + // Recreate a logger with the log level get from "/proc/cmdline". + let (logger, _logger_async_guard) = + logging::create_logger(NAME, "agent", config.log_level, writer); + + announce(&logger, &config); + + // This "unused" variable is required as it enables the global (and crucially static) logger, + // which is required to satisfy the the lifetime constraints of the auto-generated gRPC code. + let _guard = slog_scope::set_global_logger(logger.new(o!("subsystem" => "rpc"))); + + let mut _log_guard: Result<(), log::SetLoggerError> = Ok(()); + + if config.log_level == slog::Level::Trace { + // Redirect ttrpc log calls to slog iff full debug requested + _log_guard = Ok(slog_stdlog::init().map_err(|e| e)?); + } + + start_sandbox(&logger, &config, init_mode).await?; + + let _ = log_handle.await.unwrap(); + + Ok(()) +} + fn main() -> std::result::Result<(), Box> { let args: Vec = env::args().collect(); @@ -145,107 +247,7 @@ fn main() -> std::result::Result<(), Box> { .enable_all() .build()?; - rt.block_on(async { - env::set_var("RUST_BACKTRACE", "full"); - - lazy_static::initialize(&SHELLS); - - lazy_static::initialize(&AGENT_CONFIG); - - // support vsock log - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; - - let agent_config = AGENT_CONFIG.clone(); - - let init_mode = unistd::getpid() == Pid::from_raw(1); - if init_mode { - // dup a new file descriptor for this temporary logger writer, - // since this logger would be dropped and it's writer would - // be closed out of this code block. - let newwfd = dup(wfd)?; - let writer = unsafe { File::from_raw_fd(newwfd) }; - - // Init a temporary logger used by init agent as init process - // since before do the base mount, it wouldn't access "/proc/cmdline" - // to get the customzied debug level. - let (logger, logger_async_guard) = - logging::create_logger(NAME, "agent", slog::Level::Debug, writer); - - // Must mount proc fs before parsing kernel command line - general_mount(&logger).map_err(|e| { - error!(logger, "fail general mount: {}", e); - e - })?; - - let mut config = agent_config.write().await; - config.parse_cmdline(KERNEL_CMDLINE_FILE)?; - - init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?; - drop(logger_async_guard); - } else { - // once parsed cmdline and set the config, release the write lock - // as soon as possible in case other thread would get read lock on - // it. - let mut config = agent_config.write().await; - config.parse_cmdline(KERNEL_CMDLINE_FILE)?; - } - let config = agent_config.read().await; - - let log_vport = config.log_vport as u32; - let log_handle = tokio::spawn(async move { - let mut reader = PipeStream::from_fd(rfd); - - if log_vport > 0 { - let listenfd = socket::socket( - AddressFamily::Vsock, - SockType::Stream, - SockFlag::SOCK_CLOEXEC, - None, - ) - .unwrap(); - - let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, log_vport); - socket::bind(listenfd, &addr).unwrap(); - socket::listen(listenfd, 1).unwrap(); - - let mut vsock_stream = get_vsock_stream(listenfd).await.unwrap(); - - // copy log to stdout - tokio::io::copy(&mut reader, &mut vsock_stream) - .await - .unwrap(); - } - - // copy log to stdout - let mut stdout_writer = tokio::io::stdout(); - let _ = tokio::io::copy(&mut reader, &mut stdout_writer).await; - }); - - let writer = unsafe { File::from_raw_fd(wfd) }; - - // Recreate a logger with the log level get from "/proc/cmdline". - let (logger, _logger_async_guard) = - logging::create_logger(NAME, "agent", config.log_level, writer); - - announce(&logger, &config); - - // This "unused" variable is required as it enables the global (and crucially static) logger, - // which is required to satisfy the the lifetime constraints of the auto-generated gRPC code. - let _guard = slog_scope::set_global_logger(logger.new(o!("subsystem" => "rpc"))); - - let mut _log_guard: Result<(), log::SetLoggerError> = Ok(()); - - if config.log_level == slog::Level::Trace { - // Redirect ttrpc log calls to slog iff full debug requested - _log_guard = Ok(slog_stdlog::init().map_err(|e| e)?); - } - - start_sandbox(&logger, &config, init_mode).await?; - - let _ = log_handle.await.unwrap(); - - Ok(()) - }) + rt.block_on(real_main()) } async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -> Result<()> {