mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-05-01 05:04:26 +00:00
uevent: Add shutdown channel for task
Allow the uevent task to shutdown on request. Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
parent
d8d5b4cd1d
commit
1d448813a1
@ -340,7 +340,9 @@ async fn start_sandbox(
|
|||||||
|
|
||||||
tasks.push(signal_handler_task);
|
tasks.push(signal_handler_task);
|
||||||
|
|
||||||
watch_uevents(sandbox.clone()).await;
|
let uevents_handler_task = tokio::spawn(watch_uevents(sandbox.clone(), shutdown.clone()));
|
||||||
|
|
||||||
|
tasks.push(uevents_handler_task);
|
||||||
|
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
sandbox.lock().await.sender = Some(tx);
|
sandbox.lock().await.sender = Some(tx);
|
||||||
|
@ -9,10 +9,13 @@ use crate::sandbox::Sandbox;
|
|||||||
use crate::GLOBAL_DEVICE_WATCHER;
|
use crate::GLOBAL_DEVICE_WATCHER;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
use netlink_sys::{protocols, SocketAddr, TokioSocket};
|
use netlink_sys::{protocols, SocketAddr, TokioSocket};
|
||||||
use nix::errno::Errno;
|
use nix::errno::Errno;
|
||||||
use std::os::unix::io::FromRawFd;
|
use std::os::unix::io::FromRawFd;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::sync::watch::Receiver;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
@ -132,49 +135,67 @@ impl Uevent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) {
|
pub async fn watch_uevents(
|
||||||
|
sandbox: Arc<Mutex<Sandbox>>,
|
||||||
|
mut shutdown: Receiver<bool>,
|
||||||
|
) -> Result<()> {
|
||||||
let sref = sandbox.clone();
|
let sref = sandbox.clone();
|
||||||
let s = sref.lock().await;
|
let s = sref.lock().await;
|
||||||
let logger = s.logger.new(o!("subsystem" => "uevent"));
|
let logger = s.logger.new(o!("subsystem" => "uevent"));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
// Unlock the sandbox to allow a successful shutdown
|
||||||
let mut socket;
|
drop(s);
|
||||||
unsafe {
|
|
||||||
let fd = libc::socket(
|
|
||||||
libc::AF_NETLINK,
|
|
||||||
libc::SOCK_DGRAM | libc::SOCK_CLOEXEC,
|
|
||||||
protocols::NETLINK_KOBJECT_UEVENT as libc::c_int,
|
|
||||||
);
|
|
||||||
socket = TokioSocket::from_raw_fd(fd);
|
|
||||||
}
|
|
||||||
socket.bind(&SocketAddr::new(0, 1)).unwrap();
|
|
||||||
|
|
||||||
loop {
|
info!(logger, "starting uevents handler");
|
||||||
match socket.recv_from_full().await {
|
|
||||||
Err(e) => {
|
let mut socket;
|
||||||
error!(logger, "receive uevent message failed"; "error" => format!("{}", e))
|
|
||||||
}
|
unsafe {
|
||||||
Ok((buf, addr)) => {
|
let fd = libc::socket(
|
||||||
if addr.port_number() != 0 {
|
libc::AF_NETLINK,
|
||||||
// not our netlink message
|
libc::SOCK_DGRAM | libc::SOCK_CLOEXEC,
|
||||||
let err_msg = format!("{:?}", nix::Error::Sys(Errno::EBADMSG));
|
protocols::NETLINK_KOBJECT_UEVENT as libc::c_int,
|
||||||
error!(logger, "receive uevent message failed"; "error" => err_msg);
|
);
|
||||||
return;
|
socket = TokioSocket::from_raw_fd(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.bind(&SocketAddr::new(0, 1))?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = shutdown.changed() => {
|
||||||
|
info!(logger, "got shutdown request");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = socket.recv_from_full() => {
|
||||||
|
match result {
|
||||||
|
Err(e) => {
|
||||||
|
error!(logger, "failed to receive uevent"; "error" => format!("{}", e))
|
||||||
}
|
}
|
||||||
|
Ok((buf, addr)) => {
|
||||||
let text = String::from_utf8(buf);
|
if addr.port_number() != 0 {
|
||||||
match text {
|
// not our netlink message
|
||||||
Err(e) => {
|
let err_msg = format!("{:?}", nix::Error::Sys(Errno::EBADMSG));
|
||||||
error!(logger, "failed to convert bytes to text"; "error" => format!("{}", e))
|
error!(logger, "receive uevent message failed"; "error" => err_msg);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
Ok(text) => {
|
|
||||||
let event = Uevent::new(&text);
|
let text = String::from_utf8(buf);
|
||||||
info!(logger, "got uevent message"; "event" => format!("{:?}", event));
|
match text {
|
||||||
event.process(&logger, &sandbox).await;
|
Err(e) => {
|
||||||
|
error!(logger, "failed to convert bytes to text"; "error" => format!("{}", e))
|
||||||
|
}
|
||||||
|
Ok(text) => {
|
||||||
|
let event = Uevent::new(&text);
|
||||||
|
info!(logger, "got uevent message"; "event" => format!("{:?}", event));
|
||||||
|
event.process(&logger, &sandbox).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user