agent: refine uevent.rs for better maintenance

Refine uevent.rs for better maintenance:
1) use dedicated function to handle uevents.
2) use dedicated function to handle blk add events.

Signed-off-by: Liu Jiang <gerry@linux.alibaba.com>
This commit is contained in:
Liu Jiang
2019-12-02 10:05:44 +08:00
parent 8868eaeb4c
commit a4adacaa10

View File

@@ -8,10 +8,12 @@ use crate::grpc::SYSFS_MEMORY_ONLINE_PATH;
use crate::netlink::{RtnlHandle, NETLINK_UEVENT};
use crate::sandbox::Sandbox;
use crate::GLOBAL_DEVICE_WATCHER;
use slog::Logger;
use std::sync::{Arc, Mutex};
use std::thread;
pub const U_EVENT_ACTION: &str = "ACTION";
pub const U_EVENT_ACTION_ADD: &str = "add";
pub const U_EVENT_DEV_PATH: &str = "DEVPATH";
pub const U_EVENT_SUB_SYSTEM: &str = "SUBSYSTEM";
pub const U_EVENT_SEQ_NUM: &str = "SEQNUM";
@@ -19,7 +21,7 @@ pub const U_EVENT_DEV_NAME: &str = "DEVNAME";
pub const U_EVENT_INTERFACE: &str = "INTERFACE";
#[derive(Debug, Default)]
pub struct Uevent {
struct Uevent {
action: String,
devpath: String,
devname: String,
@@ -28,7 +30,8 @@ pub struct Uevent {
interface: String,
}
fn parse_uevent(message: &str) -> Uevent {
impl Uevent {
fn new(message: &str) -> Self {
let mut msg_iter = message.split('\0');
let mut event = Uevent::default();
@@ -49,50 +52,28 @@ fn parse_uevent(message: &str) -> Uevent {
}
event
}
pub fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) {
let sref = sandbox.clone();
let s = sref.lock().unwrap();
let logger = s.logger.new(o!("subsystem" => "uevent"));
thread::spawn(move || {
let rtnl = RtnlHandle::new(NETLINK_UEVENT, 1).unwrap();
loop {
match rtnl.recv_message() {
Err(e) => {
error!(logger, "receive uevent message failed"; "error" => format!("{}", e))
}
Ok(data) => {
let text = String::from_utf8(data);
match text {
Err(e) => {
error!(logger, "failed to convert bytes to text"; "error" => format!("{}", e))
// Check whether this is a block device hot-add event.
fn is_block_add_event(&self) -> bool {
self.action == U_EVENT_ACTION_ADD
&& self.subsystem == "block"
&& self.devpath.starts_with(ROOT_BUS_PATH)
&& self.devname != ""
}
Ok(text) => {
let event = parse_uevent(&text);
info!(logger, "got uevent message"; "event" => format!("{:?}", event));
// Check if device hotplug event results in a device node being created.
if event.devname != ""
&& event.devpath.starts_with(ROOT_BUS_PATH)
&& event.subsystem == "block"
{
let watcher = GLOBAL_DEVICE_WATCHER.clone();
let mut w = watcher.lock().unwrap();
let s = sandbox.clone();
let mut sb = s.lock().unwrap();
fn handle_block_add_event(&self, sandbox: &Arc<Mutex<Sandbox>>) {
// Keep the same lock order as device::get_device_name(), otherwise it may cause deadlock.
let mut w = GLOBAL_DEVICE_WATCHER.lock().unwrap();
let mut sb = sandbox.lock().unwrap();
// Add the device node name to the pci device map.
sb.pci_device_map
.insert(event.devpath.clone(), event.devname.clone());
.insert(self.devpath.clone(), self.devname.clone());
// Notify watchers that are interested in the udev event.
// Close the channel after watcher has been notified.
let devpath = event.devpath.clone();
let devpath = self.devpath.clone();
let empties: Vec<_> = w
.iter()
.filter(|(dev_addr, _)| {
@@ -107,31 +88,64 @@ pub fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) {
}
})
.map(|(k, sender)| {
let devname = event.devname.clone();
let devname = self.devname.clone();
let _ = sender.send(devname);
k.clone()
})
.collect();
// Remove notified nodes from the watcher map.
for empty in empties {
w.remove(&empty);
}
} else {
let online_path =
format!("{}/{}/online", SYSFS_DIR, &event.devpath);
}
fn process(&self, logger: &Logger, sandbox: &Arc<Mutex<Sandbox>>) {
if self.is_block_add_event() {
return self.handle_block_add_event(sandbox);
} else if self.action == U_EVENT_ACTION_ADD {
let online_path = format!("{}/{}/online", SYSFS_DIR, &self.devpath);
// It's a memory hot-add event.
if online_path.starts_with(SYSFS_MEMORY_ONLINE_PATH) {
// Check memory hotplug and online if possible
match online_device(online_path.as_ref()) {
Ok(_) => (),
Err(e) => error!(
logger,
if let Err(e) = online_device(online_path.as_ref()) {
error!(
*logger,
"failed to online device";
"device" => &event.devpath,
"device" => &self.devpath,
"error" => format!("{}", e),
),
);
}
return;
}
}
debug!(*logger, "ignoring event"; "uevent" => format!("{:?}", self));
}
}
pub fn watch_uevents(sandbox: Arc<Mutex<Sandbox>>) {
thread::spawn(move || {
let rtnl = RtnlHandle::new(NETLINK_UEVENT, 1).unwrap();
let logger = sandbox
.lock()
.unwrap()
.logger
.new(o!("subsystem" => "uevent"));
loop {
match rtnl.recv_message() {
Err(e) => {
error!(logger, "receive uevent message failed"; "error" => format!("{}", e))
}
Ok(data) => {
let text = String::from_utf8(data);
match text {
Err(e) => {
error!(logger, "failed to convert bytes to text"; "error" => format!("{}", e))
}
Ok(text) => {
let event = Uevent::new(&text);
info!(logger, "got uevent message"; "event" => format!("{:?}", event));
event.process(&logger, &sandbox);
}
}
}