agent/device: Move GLOBAL_DEVICE_WATCHER into Sandbox

In Kata 1.x, both the sysToDevMap and the deviceWatchers are in the sandbox
structure.  For some reason in Kata 2.x, the device watchers have moved to
a separate global variable, GLOBAL_DEVICE_WATCHER.

This is a bad idea: apart from introducing an extra global variable
unnecessarily, it means that Sandbox::pci_device_map and
GLOBAL_DEVICE_WATCHER are protected by separate mutexes.  Since the
information in these two structures has to be kept in sync with each other,
it makes much more sense to keep them both under the same single Sandbox
mutex.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
This commit is contained in:
David Gibson 2021-02-25 11:32:50 +11:00
parent 11ae32e3c0
commit 0616202580
4 changed files with 26 additions and 35 deletions

View File

@ -17,7 +17,7 @@ use crate::linux_abi::*;
use crate::mount::{DRIVER_BLK_TYPE, DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_SCSI_TYPE}; use crate::mount::{DRIVER_BLK_TYPE, DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_SCSI_TYPE};
use crate::pci; use crate::pci;
use crate::sandbox::Sandbox; use crate::sandbox::Sandbox;
use crate::{AGENT_CONFIG, GLOBAL_DEVICE_WATCHER}; use crate::AGENT_CONFIG;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use oci::{LinuxDeviceCgroup, LinuxResources, Spec}; use oci::{LinuxDeviceCgroup, LinuxResources, Spec};
use protocols::agent::Device; use protocols::agent::Device;
@ -88,16 +88,13 @@ fn pcipath_to_sysfs(root_bus_sysfs: &str, pcipath: &pci::Path) -> Result<String>
} }
async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Result<String> { async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Result<String> {
// Keep the same lock order as uevent::handle_block_add_event(), otherwise it may cause deadlock. let mut sb = sandbox.lock().await;
let mut w = GLOBAL_DEVICE_WATCHER.lock().await;
let sb = sandbox.lock().await;
for (key, value) in sb.pci_device_map.iter() { for (key, value) in sb.pci_device_map.iter() {
if key.contains(dev_addr) { if key.contains(dev_addr) {
info!(sl!(), "Device {} found in pci device map", dev_addr); info!(sl!(), "Device {} found in pci device map", dev_addr);
return Ok(format!("{}/{}", SYSTEM_DEV_PATH, value)); return Ok(format!("{}/{}", SYSTEM_DEV_PATH, value));
} }
} }
drop(sb);
// If device is not found in the device map, hotplug event has not // If device is not found in the device map, hotplug event has not
// been received yet, create and add channel to the watchers map. // been received yet, create and add channel to the watchers map.
@ -105,8 +102,8 @@ async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Resul
// Note this is done inside the lock, not to miss any events from the // Note this is done inside the lock, not to miss any events from the
// global udev listener. // global udev listener.
let (tx, rx) = tokio::sync::oneshot::channel::<String>(); let (tx, rx) = tokio::sync::oneshot::channel::<String>();
w.insert(dev_addr.to_string(), Some(tx)); sb.dev_watcher.insert(dev_addr.to_string(), tx);
drop(w); drop(sb); // unlock
info!(sl!(), "Waiting on channel for device notification\n"); info!(sl!(), "Waiting on channel for device notification\n");
let hotplug_timeout = AGENT_CONFIG.read().await.hotplug_timeout; let hotplug_timeout = AGENT_CONFIG.read().await.hotplug_timeout;
@ -114,9 +111,8 @@ async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Resul
let dev_name = match tokio::time::timeout(hotplug_timeout, rx).await { let dev_name = match tokio::time::timeout(hotplug_timeout, rx).await {
Ok(v) => v?, Ok(v) => v?,
Err(_) => { Err(_) => {
let watcher = GLOBAL_DEVICE_WATCHER.clone(); let mut sb = sandbox.lock().await;
let mut w = watcher.lock().await; sb.dev_watcher.remove_entry(dev_addr);
w.remove_entry(dev_addr);
return Err(anyhow!( return Err(anyhow!(
"Timeout reached after {:?} waiting for device {}", "Timeout reached after {:?} waiting for device {}",
@ -800,21 +796,23 @@ mod tests {
sb.pci_device_map.remove(&devpath); sb.pci_device_map.remove(&devpath);
drop(sb); // unlock drop(sb); // unlock
let watcher_sandbox = Arc::clone(&sandbox);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let mut w = GLOBAL_DEVICE_WATCHER.lock().await; let mut sb = watcher_sandbox.lock().await;
let matched_key = w let matched_key = sb
.dev_watcher
.keys() .keys()
.filter(|dev_addr| devpath.contains(*dev_addr)) .filter(|dev_addr| devpath.contains(*dev_addr))
.cloned() .cloned()
.next(); .next();
if let Some(k) = matched_key { if let Some(k) = matched_key {
let sender = w.remove(&k).unwrap().unwrap(); let sender = sb.dev_watcher.remove(&k).unwrap();
let _ = sender.send(devname.to_string()); let _ = sender.send(devname.to_string());
return; return;
} }
drop(w); // unlock drop(sb); // unlock
} }
}); });

View File

@ -28,7 +28,6 @@ use nix::sys::select::{select, FdSet};
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use nix::sys::wait; use nix::sys::wait;
use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult}; use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult};
use std::collections::HashMap;
use std::env; use std::env;
use std::ffi::{CStr, CString, OsStr}; use std::ffi::{CStr, CString, OsStr};
use std::fs::{self, File}; use std::fs::{self, File};
@ -72,7 +71,6 @@ use rustjail::pipestream::PipeStream;
use tokio::{ use tokio::{
io::AsyncWrite, io::AsyncWrite,
sync::{ sync::{
oneshot::Sender,
watch::{channel, Receiver}, watch::{channel, Receiver},
Mutex, RwLock, Mutex, RwLock,
}, },
@ -89,8 +87,6 @@ const CONSOLE_PATH: &str = "/dev/console";
const DEFAULT_BUF_SIZE: usize = 8 * 1024; const DEFAULT_BUF_SIZE: usize = 8 * 1024;
lazy_static! { lazy_static! {
static ref GLOBAL_DEVICE_WATCHER: Arc<Mutex<HashMap<String, Option<Sender<String>>>>> =
Arc::new(Mutex::new(HashMap::new()));
static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> = static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> =
Arc::new(RwLock::new(config::AgentConfig::new())); Arc::new(RwLock::new(config::AgentConfig::new()));
} }

View File

@ -37,6 +37,7 @@ pub struct Sandbox {
pub mounts: Vec<String>, pub mounts: Vec<String>,
pub container_mounts: HashMap<String, Vec<String>>, pub container_mounts: HashMap<String, Vec<String>>,
pub pci_device_map: HashMap<String, String>, pub pci_device_map: HashMap<String, String>,
pub dev_watcher: HashMap<String, tokio::sync::oneshot::Sender<String>>,
pub shared_utsns: Namespace, pub shared_utsns: Namespace,
pub shared_ipcns: Namespace, pub shared_ipcns: Namespace,
pub sandbox_pidns: Option<Namespace>, pub sandbox_pidns: Option<Namespace>,
@ -66,6 +67,7 @@ impl Sandbox {
mounts: Vec::new(), mounts: Vec::new(),
container_mounts: HashMap::new(), container_mounts: HashMap::new(),
pci_device_map: HashMap::new(), pci_device_map: HashMap::new(),
dev_watcher: HashMap::new(),
shared_utsns: Namespace::new(&logger), shared_utsns: Namespace::new(&logger),
shared_ipcns: Namespace::new(&logger), shared_ipcns: Namespace::new(&logger),
sandbox_pidns: None, sandbox_pidns: None,

View File

@ -6,7 +6,6 @@
use crate::device::online_device; use crate::device::online_device;
use crate::linux_abi::*; use crate::linux_abi::*;
use crate::sandbox::Sandbox; use crate::sandbox::Sandbox;
use crate::GLOBAL_DEVICE_WATCHER;
use slog::Logger; use slog::Logger;
use anyhow::Result; use anyhow::Result;
@ -66,10 +65,6 @@ impl Uevent {
async fn handle_block_add_event(&self, sandbox: &Arc<Mutex<Sandbox>>) { async fn handle_block_add_event(&self, sandbox: &Arc<Mutex<Sandbox>>) {
let pci_root_bus_path = create_pci_root_bus_path(); let pci_root_bus_path = create_pci_root_bus_path();
// Keep the same lock order as device::get_device_name(), otherwise it may cause deadlock.
let watcher = GLOBAL_DEVICE_WATCHER.clone();
let mut w = watcher.lock().await;
let mut sb = sandbox.lock().await; let mut sb = sandbox.lock().await;
// Add the device node name to the pci device map. // Add the device node name to the pci device map.
@ -79,9 +74,10 @@ impl Uevent {
// Notify watchers that are interested in the udev event. // Notify watchers that are interested in the udev event.
// Close the channel after watcher has been notified. // Close the channel after watcher has been notified.
let devpath = self.devpath.clone(); let devpath = self.devpath.clone();
let empties: Vec<_> = w let keys: Vec<_> = sb
.iter_mut() .dev_watcher
.filter(|(dev_addr, _)| { .keys()
.filter(|dev_addr| {
let pci_p = format!("{}{}", pci_root_bus_path, *dev_addr); let pci_p = format!("{}{}", pci_root_bus_path, *dev_addr);
// blk block device // blk block device
@ -99,17 +95,16 @@ impl Uevent {
dev_addr.ends_with(pmem_suffix.as_str()) dev_addr.ends_with(pmem_suffix.as_str())
} }
}) })
.map(|(k, sender)| { .cloned()
let devname = self.devname.clone();
let sender = sender.take().unwrap();
let _ = sender.send(devname);
k.clone()
})
.collect(); .collect();
// Remove notified nodes from the watcher map. for k in keys {
for empty in empties { let devname = self.devname.clone();
w.remove(&empty); // unwrap() is safe because logic above ensures k exists
// in the map, and it's locked so no-one else can change
// that
let sender = sb.dev_watcher.remove(&k).unwrap();
let _ = sender.send(devname);
} }
} }