uevent: Add shutdown channel for task

Allow the uevent task to shutdown on request.

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2021-03-09 15:22:34 +00:00
parent d8d5b4cd1d
commit 1d448813a1
2 changed files with 57 additions and 34 deletions

View File

@ -340,7 +340,9 @@ async fn start_sandbox(
tasks.push(signal_handler_task); tasks.push(signal_handler_task);
watch_uevents(sandbox.clone()).await; let uevents_handler_task = tokio::spawn(watch_uevents(sandbox.clone(), shutdown.clone()));
tasks.push(uevents_handler_task);
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
sandbox.lock().await.sender = Some(tx); sandbox.lock().await.sender = Some(tx);

View File

@ -9,10 +9,13 @@ use crate::sandbox::Sandbox;
use crate::GLOBAL_DEVICE_WATCHER; use crate::GLOBAL_DEVICE_WATCHER;
use slog::Logger; use slog::Logger;
use anyhow::Result;
use netlink_sys::{protocols, SocketAddr, TokioSocket}; use netlink_sys::{protocols, SocketAddr, TokioSocket};
use nix::errno::Errno; use nix::errno::Errno;
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::sync::Arc; use std::sync::Arc;
use tokio::select;
use tokio::sync::watch::Receiver;
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -132,13 +135,21 @@ impl Uevent {
} }
} }
pub async fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) { pub async fn watch_uevents(
sandbox: Arc<Mutex<Sandbox>>,
mut shutdown: Receiver<bool>,
) -> Result<()> {
let sref = sandbox.clone(); let sref = sandbox.clone();
let s = sref.lock().await; let s = sref.lock().await;
let logger = s.logger.new(o!("subsystem" => "uevent")); let logger = s.logger.new(o!("subsystem" => "uevent"));
tokio::spawn(async move { // Unlock the sandbox to allow a successful shutdown
drop(s);
info!(logger, "starting uevents handler");
let mut socket; let mut socket;
unsafe { unsafe {
let fd = libc::socket( let fd = libc::socket(
libc::AF_NETLINK, libc::AF_NETLINK,
@ -147,19 +158,26 @@ pub async fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) {
); );
socket = TokioSocket::from_raw_fd(fd); socket = TokioSocket::from_raw_fd(fd);
} }
socket.bind(&SocketAddr::new(0, 1)).unwrap();
socket.bind(&SocketAddr::new(0, 1))?;
loop { loop {
match socket.recv_from_full().await { select! {
_ = shutdown.changed() => {
info!(logger, "got shutdown request");
break;
}
result = socket.recv_from_full() => {
match result {
Err(e) => { Err(e) => {
error!(logger, "receive uevent message failed"; "error" => format!("{}", e)) error!(logger, "failed to receive uevent"; "error" => format!("{}", e))
} }
Ok((buf, addr)) => { Ok((buf, addr)) => {
if addr.port_number() != 0 { if addr.port_number() != 0 {
// not our netlink message // not our netlink message
let err_msg = format!("{:?}", nix::Error::Sys(Errno::EBADMSG)); let err_msg = format!("{:?}", nix::Error::Sys(Errno::EBADMSG));
error!(logger, "receive uevent message failed"; "error" => err_msg); error!(logger, "receive uevent message failed"; "error" => err_msg);
return; continue;
} }
let text = String::from_utf8(buf); let text = String::from_utf8(buf);
@ -176,5 +194,8 @@ pub async fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) {
} }
} }
} }
}); }
}
Ok(())
} }