uevent: Add shutdown channel for task

Allow the uevent task to shutdown on request.

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2021-03-09 15:22:34 +00:00
parent d8d5b4cd1d
commit 1d448813a1
2 changed files with 57 additions and 34 deletions

View File

@ -340,7 +340,9 @@ async fn start_sandbox(
tasks.push(signal_handler_task); tasks.push(signal_handler_task);
watch_uevents(sandbox.clone()).await; let uevents_handler_task = tokio::spawn(watch_uevents(sandbox.clone(), shutdown.clone()));
tasks.push(uevents_handler_task);
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
sandbox.lock().await.sender = Some(tx); sandbox.lock().await.sender = Some(tx);

View File

@ -9,10 +9,13 @@ use crate::sandbox::Sandbox;
use crate::GLOBAL_DEVICE_WATCHER; use crate::GLOBAL_DEVICE_WATCHER;
use slog::Logger; use slog::Logger;
use anyhow::Result;
use netlink_sys::{protocols, SocketAddr, TokioSocket}; use netlink_sys::{protocols, SocketAddr, TokioSocket};
use nix::errno::Errno; use nix::errno::Errno;
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::sync::Arc; use std::sync::Arc;
use tokio::select;
use tokio::sync::watch::Receiver;
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -132,49 +135,67 @@ impl Uevent {
} }
} }
pub async fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) { pub async fn watch_uevents(
sandbox: Arc<Mutex<Sandbox>>,
mut shutdown: Receiver<bool>,
) -> Result<()> {
let sref = sandbox.clone(); let sref = sandbox.clone();
let s = sref.lock().await; let s = sref.lock().await;
let logger = s.logger.new(o!("subsystem" => "uevent")); let logger = s.logger.new(o!("subsystem" => "uevent"));
tokio::spawn(async move { // Unlock the sandbox to allow a successful shutdown
let mut socket; drop(s);
unsafe {
let fd = libc::socket(
libc::AF_NETLINK,
libc::SOCK_DGRAM | libc::SOCK_CLOEXEC,
protocols::NETLINK_KOBJECT_UEVENT as libc::c_int,
);
socket = TokioSocket::from_raw_fd(fd);
}
socket.bind(&SocketAddr::new(0, 1)).unwrap();
loop { info!(logger, "starting uevents handler");
match socket.recv_from_full().await {
Err(e) => { let mut socket;
error!(logger, "receive uevent message failed"; "error" => format!("{}", e))
} unsafe {
Ok((buf, addr)) => { let fd = libc::socket(
if addr.port_number() != 0 { libc::AF_NETLINK,
// not our netlink message libc::SOCK_DGRAM | libc::SOCK_CLOEXEC,
let err_msg = format!("{:?}", nix::Error::Sys(Errno::EBADMSG)); protocols::NETLINK_KOBJECT_UEVENT as libc::c_int,
error!(logger, "receive uevent message failed"; "error" => err_msg); );
return; socket = TokioSocket::from_raw_fd(fd);
}
socket.bind(&SocketAddr::new(0, 1))?;
loop {
select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
}
result = socket.recv_from_full() => {
match result {
Err(e) => {
error!(logger, "failed to receive uevent"; "error" => format!("{}", e))
} }
Ok((buf, addr)) => {
let text = String::from_utf8(buf); if addr.port_number() != 0 {
match text { // not our netlink message
Err(e) => { let err_msg = format!("{:?}", nix::Error::Sys(Errno::EBADMSG));
error!(logger, "failed to convert bytes to text"; "error" => format!("{}", e)) error!(logger, "receive uevent message failed"; "error" => err_msg);
continue;
} }
Ok(text) => {
let event = Uevent::new(&text); let text = String::from_utf8(buf);
info!(logger, "got uevent message"; "event" => format!("{:?}", event)); match text {
event.process(&logger, &sandbox).await; Err(e) => {
error!(logger, "failed to convert bytes to text"; "error" => format!("{}", e))
}
Ok(text) => {
let event = Uevent::new(&text);
info!(logger, "got uevent message"; "event" => format!("{:?}", event));
event.process(&logger, &sandbox).await;
}
} }
} }
} }
} }
} }
}); }
Ok(())
} }