mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
agent: Store uevent watchers in Vec rather than HashMap
Sandbox:dev_watcher is a HashMap from a "device address" to a channel used to notify get_device_name() that a suitable uevent has been found. However, "device address" isn't well defined, having somewhat different meanings for different device/event types. We never actually look up this HashMap by key, except to remove entries. Not looking up by key suggests that a map is not the appropriate data structure here. Furthermore, HashMap imposes limitations on the types which will prevent some future extensions we want. So, replace the HashMap with a Vec<Option<>>. We need the Option<> so that we can remove entries by index (removing them from the Vec completely would hange the indices of other entries, possibly breaking concurrent work. This does mean that the vector will keep growing as we watch for different events during startup. However, we don't expect the number of device events we watch for during a run to be very large, so that shouldn't be a problem. We can optimize this later if it becomes a problem. Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
This commit is contained in:
parent
91e0ef5c90
commit
55ed2ddd07
@ -103,7 +103,8 @@ async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Resul
|
||||
// Note this is done inside the lock, not to miss any events from the
|
||||
// global udev listener.
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<Uevent>();
|
||||
sb.dev_watcher.insert(dev_addr.to_string(), tx);
|
||||
let idx = sb.uevent_watchers.len();
|
||||
sb.uevent_watchers.push(Some((dev_addr.to_string(), tx)));
|
||||
drop(sb); // unlock
|
||||
|
||||
info!(sl!(), "Waiting on channel for device notification\n");
|
||||
@ -113,7 +114,7 @@ async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Resul
|
||||
Ok(v) => v?,
|
||||
Err(_) => {
|
||||
let mut sb = sandbox.lock().await;
|
||||
sb.dev_watcher.remove_entry(dev_addr);
|
||||
sb.uevent_watchers[idx].take();
|
||||
|
||||
return Err(anyhow!(
|
||||
"Timeout reached after {:?} waiting for device {}",
|
||||
@ -804,17 +805,14 @@ mod tests {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let mut sb = watcher_sandbox.lock().await;
|
||||
let matched_key = sb
|
||||
.dev_watcher
|
||||
.keys()
|
||||
.filter(|dev_addr| devpath.contains(*dev_addr))
|
||||
.cloned()
|
||||
.next();
|
||||
|
||||
if let Some(k) = matched_key {
|
||||
let sender = sb.dev_watcher.remove(&k).unwrap();
|
||||
let _ = sender.send(uev);
|
||||
return;
|
||||
for w in &mut sb.uevent_watchers {
|
||||
if let Some((dev_addr, _)) = w {
|
||||
if devpath.contains(dev_addr.as_str()) {
|
||||
let (_, sender) = w.take().unwrap();
|
||||
let _ = sender.send(uev);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(sb); // unlock
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{thread, time};
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -38,7 +39,7 @@ pub struct Sandbox {
|
||||
pub mounts: Vec<String>,
|
||||
pub container_mounts: HashMap<String, Vec<String>>,
|
||||
pub uevent_map: HashMap<String, Uevent>,
|
||||
pub dev_watcher: HashMap<String, tokio::sync::oneshot::Sender<Uevent>>,
|
||||
pub uevent_watchers: Vec<Option<(String, oneshot::Sender<Uevent>)>>,
|
||||
pub shared_utsns: Namespace,
|
||||
pub shared_ipcns: Namespace,
|
||||
pub sandbox_pidns: Option<Namespace>,
|
||||
@ -68,7 +69,7 @@ impl Sandbox {
|
||||
mounts: Vec::new(),
|
||||
container_mounts: HashMap::new(),
|
||||
uevent_map: HashMap::new(),
|
||||
dev_watcher: HashMap::new(),
|
||||
uevent_watchers: Vec::new(),
|
||||
shared_utsns: Namespace::new(&logger),
|
||||
shared_ipcns: Namespace::new(&logger),
|
||||
sandbox_pidns: None,
|
||||
|
@ -71,38 +71,26 @@ impl Uevent {
|
||||
sb.uevent_map.insert(self.devpath.clone(), self.clone());
|
||||
|
||||
// Notify watchers that are interested in the udev event.
|
||||
// Close the channel after watcher has been notified.
|
||||
let devpath = self.devpath.clone();
|
||||
let keys: Vec<_> = sb
|
||||
.dev_watcher
|
||||
.keys()
|
||||
.filter(|dev_addr| {
|
||||
let pci_p = format!("{}{}", pci_root_bus_path, *dev_addr);
|
||||
for watch in &mut sb.uevent_watchers {
|
||||
if let Some((dev_addr, _)) = watch {
|
||||
let pci_p = format!("{}{}", pci_root_bus_path, dev_addr);
|
||||
let pmem_suffix = format!("/{}/{}", SCSI_BLOCK_SUFFIX, self.devname);
|
||||
|
||||
// blk block device
|
||||
devpath.starts_with(pci_p.as_str()) ||
|
||||
// scsi block device
|
||||
if self.devpath.starts_with(pci_p.as_str()) || // blk block device
|
||||
( // scsi block device
|
||||
dev_addr.ends_with(SCSI_BLOCK_SUFFIX) &&
|
||||
self.devpath.contains(dev_addr.as_str())
|
||||
) ||
|
||||
( // nvdimm/pmem device
|
||||
self.devpath.starts_with(ACPI_DEV_PATH) &&
|
||||
self.devpath.ends_with(pmem_suffix.as_str()) &&
|
||||
dev_addr.ends_with(pmem_suffix.as_str())
|
||||
)
|
||||
{
|
||||
(*dev_addr).ends_with(SCSI_BLOCK_SUFFIX) &&
|
||||
devpath.contains(*dev_addr)
|
||||
} ||
|
||||
// nvdimm/pmem device
|
||||
{
|
||||
let pmem_suffix = format!("/{}/{}", SCSI_BLOCK_SUFFIX, self.devname);
|
||||
devpath.starts_with(ACPI_DEV_PATH) &&
|
||||
devpath.ends_with(pmem_suffix.as_str()) &&
|
||||
dev_addr.ends_with(pmem_suffix.as_str())
|
||||
let (_, sender) = watch.take().unwrap();
|
||||
let _ = sender.send(self.clone());
|
||||
}
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
for k in keys {
|
||||
// unwrap() is safe because logic above ensures k exists
|
||||
// in the map, and it's locked so no-one else can change
|
||||
// that
|
||||
let sender = sb.dev_watcher.remove(&k).unwrap();
|
||||
let _ = sender.send(self.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user