main: Refactor main logic into new async function

Move most of the main logic into a separate async function. This makes
the code clearer and avoids the anonymous async block.

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2021-02-22 11:23:28 +00:00
parent 2a648fa760
commit 039df1d727

View File

@ -121,6 +121,108 @@ async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
Ok(stream)
}
async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
env::set_var("RUST_BACKTRACE", "full");
lazy_static::initialize(&SHELLS);
lazy_static::initialize(&AGENT_CONFIG);
// support vsock log
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let agent_config = AGENT_CONFIG.clone();
let init_mode = unistd::getpid() == Pid::from_raw(1);
if init_mode {
// dup a new file descriptor for this temporary logger writer,
// since this logger would be dropped and it's writer would
// be closed out of this code block.
let newwfd = dup(wfd)?;
let writer = unsafe { File::from_raw_fd(newwfd) };
// Init a temporary logger used by init agent as init process
// since before do the base mount, it wouldn't access "/proc/cmdline"
// to get the customzied debug level.
let (logger, logger_async_guard) =
logging::create_logger(NAME, "agent", slog::Level::Debug, writer);
// Must mount proc fs before parsing kernel command line
general_mount(&logger).map_err(|e| {
error!(logger, "fail general mount: {}", e);
e
})?;
let mut config = agent_config.write().await;
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?;
drop(logger_async_guard);
} else {
// once parsed cmdline and set the config, release the write lock
// as soon as possible in case other thread would get read lock on
// it.
let mut config = agent_config.write().await;
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
}
let config = agent_config.read().await;
let log_vport = config.log_vport as u32;
let log_handle = tokio::spawn(async move {
let mut reader = PipeStream::from_fd(rfd);
if log_vport > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)
.unwrap();
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, log_vport);
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
let mut vsock_stream = get_vsock_stream(listenfd).await.unwrap();
// copy log to stdout
tokio::io::copy(&mut reader, &mut vsock_stream)
.await
.unwrap();
}
// copy log to stdout
let mut stdout_writer = tokio::io::stdout();
let _ = tokio::io::copy(&mut reader, &mut stdout_writer).await;
});
let writer = unsafe { File::from_raw_fd(wfd) };
// Recreate a logger with the log level get from "/proc/cmdline".
let (logger, _logger_async_guard) =
logging::create_logger(NAME, "agent", config.log_level, writer);
announce(&logger, &config);
// This "unused" variable is required as it enables the global (and crucially static) logger,
// which is required to satisfy the the lifetime constraints of the auto-generated gRPC code.
let _guard = slog_scope::set_global_logger(logger.new(o!("subsystem" => "rpc")));
let mut _log_guard: Result<(), log::SetLoggerError> = Ok(());
if config.log_level == slog::Level::Trace {
// Redirect ttrpc log calls to slog iff full debug requested
_log_guard = Ok(slog_stdlog::init().map_err(|e| e)?);
}
start_sandbox(&logger, &config, init_mode).await?;
let _ = log_handle.await.unwrap();
Ok(())
}
fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
@ -145,107 +247,7 @@ fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.enable_all()
.build()?;
rt.block_on(async {
env::set_var("RUST_BACKTRACE", "full");
lazy_static::initialize(&SHELLS);
lazy_static::initialize(&AGENT_CONFIG);
// support vsock log
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
let agent_config = AGENT_CONFIG.clone();
let init_mode = unistd::getpid() == Pid::from_raw(1);
if init_mode {
// dup a new file descriptor for this temporary logger writer,
// since this logger would be dropped and it's writer would
// be closed out of this code block.
let newwfd = dup(wfd)?;
let writer = unsafe { File::from_raw_fd(newwfd) };
// Init a temporary logger used by init agent as init process
// since before do the base mount, it wouldn't access "/proc/cmdline"
// to get the customzied debug level.
let (logger, logger_async_guard) =
logging::create_logger(NAME, "agent", slog::Level::Debug, writer);
// Must mount proc fs before parsing kernel command line
general_mount(&logger).map_err(|e| {
error!(logger, "fail general mount: {}", e);
e
})?;
let mut config = agent_config.write().await;
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?;
drop(logger_async_guard);
} else {
// once parsed cmdline and set the config, release the write lock
// as soon as possible in case other thread would get read lock on
// it.
let mut config = agent_config.write().await;
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
}
let config = agent_config.read().await;
let log_vport = config.log_vport as u32;
let log_handle = tokio::spawn(async move {
let mut reader = PipeStream::from_fd(rfd);
if log_vport > 0 {
let listenfd = socket::socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)
.unwrap();
let addr = SockAddr::new_vsock(libc::VMADDR_CID_ANY, log_vport);
socket::bind(listenfd, &addr).unwrap();
socket::listen(listenfd, 1).unwrap();
let mut vsock_stream = get_vsock_stream(listenfd).await.unwrap();
// copy log to stdout
tokio::io::copy(&mut reader, &mut vsock_stream)
.await
.unwrap();
}
// copy log to stdout
let mut stdout_writer = tokio::io::stdout();
let _ = tokio::io::copy(&mut reader, &mut stdout_writer).await;
});
let writer = unsafe { File::from_raw_fd(wfd) };
// Recreate a logger with the log level get from "/proc/cmdline".
let (logger, _logger_async_guard) =
logging::create_logger(NAME, "agent", config.log_level, writer);
announce(&logger, &config);
// This "unused" variable is required as it enables the global (and crucially static) logger,
// which is required to satisfy the the lifetime constraints of the auto-generated gRPC code.
let _guard = slog_scope::set_global_logger(logger.new(o!("subsystem" => "rpc")));
let mut _log_guard: Result<(), log::SetLoggerError> = Ok(());
if config.log_level == slog::Level::Trace {
// Redirect ttrpc log calls to slog iff full debug requested
_log_guard = Ok(slog_stdlog::init().map_err(|e| e)?);
}
start_sandbox(&logger, &config, init_mode).await?;
let _ = log_handle.await.unwrap();
Ok(())
})
rt.block_on(real_main())
}
async fn start_sandbox(logger: &Logger, config: &AgentConfig, init_mode: bool) -> Result<()> {