mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-10-24 21:51:37 +00:00 
			
		
		
		
	agent/device: Move GLOBAL_DEVICE_WATCHER into Sandbox
In Kata 1.x, both the sysToDevMap and the deviceWatchers are in the sandbox structure. For some reason in Kata 2.x, the device watchers have moved to a separate global variable, GLOBAL_DEVICE_WATCHER. This is a bad idea: apart from introducing an extra global variable unnecessarily, it means that Sandbox::pci_device_map and GLOBAL_DEVICE_WATCHER are protected by separate mutexes. Since the information in these two structures has to be kept in sync with each other, it makes much more sense to keep them both under the same single Sandbox mutex. Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
This commit is contained in:
		| @@ -17,7 +17,7 @@ use crate::linux_abi::*; | |||||||
| use crate::mount::{DRIVER_BLK_TYPE, DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_SCSI_TYPE}; | use crate::mount::{DRIVER_BLK_TYPE, DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_SCSI_TYPE}; | ||||||
| use crate::pci; | use crate::pci; | ||||||
| use crate::sandbox::Sandbox; | use crate::sandbox::Sandbox; | ||||||
| use crate::{AGENT_CONFIG, GLOBAL_DEVICE_WATCHER}; | use crate::AGENT_CONFIG; | ||||||
| use anyhow::{anyhow, Result}; | use anyhow::{anyhow, Result}; | ||||||
| use oci::{LinuxDeviceCgroup, LinuxResources, Spec}; | use oci::{LinuxDeviceCgroup, LinuxResources, Spec}; | ||||||
| use protocols::agent::Device; | use protocols::agent::Device; | ||||||
| @@ -88,16 +88,13 @@ fn pcipath_to_sysfs(root_bus_sysfs: &str, pcipath: &pci::Path) -> Result<String> | |||||||
| } | } | ||||||
|  |  | ||||||
| async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Result<String> { | async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Result<String> { | ||||||
|     // Keep the same lock order as uevent::handle_block_add_event(), otherwise it may cause deadlock. |     let mut sb = sandbox.lock().await; | ||||||
|     let mut w = GLOBAL_DEVICE_WATCHER.lock().await; |  | ||||||
|     let sb = sandbox.lock().await; |  | ||||||
|     for (key, value) in sb.pci_device_map.iter() { |     for (key, value) in sb.pci_device_map.iter() { | ||||||
|         if key.contains(dev_addr) { |         if key.contains(dev_addr) { | ||||||
|             info!(sl!(), "Device {} found in pci device map", dev_addr); |             info!(sl!(), "Device {} found in pci device map", dev_addr); | ||||||
|             return Ok(format!("{}/{}", SYSTEM_DEV_PATH, value)); |             return Ok(format!("{}/{}", SYSTEM_DEV_PATH, value)); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|     drop(sb); |  | ||||||
|  |  | ||||||
|     // If device is not found in the device map, hotplug event has not |     // If device is not found in the device map, hotplug event has not | ||||||
|     // been received yet, create and add channel to the watchers map. |     // been received yet, create and add channel to the watchers map. | ||||||
| @@ -105,8 +102,8 @@ async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Resul | |||||||
|     // Note this is done inside the lock, not to miss any events from the |     // Note this is done inside the lock, not to miss any events from the | ||||||
|     // global udev listener. |     // global udev listener. | ||||||
|     let (tx, rx) = tokio::sync::oneshot::channel::<String>(); |     let (tx, rx) = tokio::sync::oneshot::channel::<String>(); | ||||||
|     w.insert(dev_addr.to_string(), Some(tx)); |     sb.dev_watcher.insert(dev_addr.to_string(), tx); | ||||||
|     drop(w); |     drop(sb); // unlock | ||||||
|  |  | ||||||
|     info!(sl!(), "Waiting on channel for device notification\n"); |     info!(sl!(), "Waiting on channel for device notification\n"); | ||||||
|     let hotplug_timeout = AGENT_CONFIG.read().await.hotplug_timeout; |     let hotplug_timeout = AGENT_CONFIG.read().await.hotplug_timeout; | ||||||
| @@ -114,9 +111,8 @@ async fn get_device_name(sandbox: &Arc<Mutex<Sandbox>>, dev_addr: &str) -> Resul | |||||||
|     let dev_name = match tokio::time::timeout(hotplug_timeout, rx).await { |     let dev_name = match tokio::time::timeout(hotplug_timeout, rx).await { | ||||||
|         Ok(v) => v?, |         Ok(v) => v?, | ||||||
|         Err(_) => { |         Err(_) => { | ||||||
|             let watcher = GLOBAL_DEVICE_WATCHER.clone(); |             let mut sb = sandbox.lock().await; | ||||||
|             let mut w = watcher.lock().await; |             sb.dev_watcher.remove_entry(dev_addr); | ||||||
|             w.remove_entry(dev_addr); |  | ||||||
|  |  | ||||||
|             return Err(anyhow!( |             return Err(anyhow!( | ||||||
|                 "Timeout reached after {:?} waiting for device {}", |                 "Timeout reached after {:?} waiting for device {}", | ||||||
| @@ -800,21 +796,23 @@ mod tests { | |||||||
|         sb.pci_device_map.remove(&devpath); |         sb.pci_device_map.remove(&devpath); | ||||||
|         drop(sb); // unlock |         drop(sb); // unlock | ||||||
|  |  | ||||||
|  |         let watcher_sandbox = Arc::clone(&sandbox); | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             loop { |             loop { | ||||||
|                 let mut w = GLOBAL_DEVICE_WATCHER.lock().await; |                 let mut sb = watcher_sandbox.lock().await; | ||||||
|                 let matched_key = w |                 let matched_key = sb | ||||||
|  |                     .dev_watcher | ||||||
|                     .keys() |                     .keys() | ||||||
|                     .filter(|dev_addr| devpath.contains(*dev_addr)) |                     .filter(|dev_addr| devpath.contains(*dev_addr)) | ||||||
|                     .cloned() |                     .cloned() | ||||||
|                     .next(); |                     .next(); | ||||||
|  |  | ||||||
|                 if let Some(k) = matched_key { |                 if let Some(k) = matched_key { | ||||||
|                     let sender = w.remove(&k).unwrap().unwrap(); |                     let sender = sb.dev_watcher.remove(&k).unwrap(); | ||||||
|                     let _ = sender.send(devname.to_string()); |                     let _ = sender.send(devname.to_string()); | ||||||
|                     return; |                     return; | ||||||
|                 } |                 } | ||||||
|                 drop(w); // unlock |                 drop(sb); // unlock | ||||||
|             } |             } | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -28,7 +28,6 @@ use nix::sys::select::{select, FdSet}; | |||||||
| use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; | use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; | ||||||
| use nix::sys::wait; | use nix::sys::wait; | ||||||
| use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult}; | use nix::unistd::{self, close, dup, dup2, fork, setsid, ForkResult}; | ||||||
| use std::collections::HashMap; |  | ||||||
| use std::env; | use std::env; | ||||||
| use std::ffi::{CStr, CString, OsStr}; | use std::ffi::{CStr, CString, OsStr}; | ||||||
| use std::fs::{self, File}; | use std::fs::{self, File}; | ||||||
| @@ -72,7 +71,6 @@ use rustjail::pipestream::PipeStream; | |||||||
| use tokio::{ | use tokio::{ | ||||||
|     io::AsyncWrite, |     io::AsyncWrite, | ||||||
|     sync::{ |     sync::{ | ||||||
|         oneshot::Sender, |  | ||||||
|         watch::{channel, Receiver}, |         watch::{channel, Receiver}, | ||||||
|         Mutex, RwLock, |         Mutex, RwLock, | ||||||
|     }, |     }, | ||||||
| @@ -89,8 +87,6 @@ const CONSOLE_PATH: &str = "/dev/console"; | |||||||
| const DEFAULT_BUF_SIZE: usize = 8 * 1024; | const DEFAULT_BUF_SIZE: usize = 8 * 1024; | ||||||
|  |  | ||||||
| lazy_static! { | lazy_static! { | ||||||
|     static ref GLOBAL_DEVICE_WATCHER: Arc<Mutex<HashMap<String, Option<Sender<String>>>>> = |  | ||||||
|         Arc::new(Mutex::new(HashMap::new())); |  | ||||||
|     static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> = |     static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> = | ||||||
|         Arc::new(RwLock::new(config::AgentConfig::new())); |         Arc::new(RwLock::new(config::AgentConfig::new())); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -37,6 +37,7 @@ pub struct Sandbox { | |||||||
|     pub mounts: Vec<String>, |     pub mounts: Vec<String>, | ||||||
|     pub container_mounts: HashMap<String, Vec<String>>, |     pub container_mounts: HashMap<String, Vec<String>>, | ||||||
|     pub pci_device_map: HashMap<String, String>, |     pub pci_device_map: HashMap<String, String>, | ||||||
|  |     pub dev_watcher: HashMap<String, tokio::sync::oneshot::Sender<String>>, | ||||||
|     pub shared_utsns: Namespace, |     pub shared_utsns: Namespace, | ||||||
|     pub shared_ipcns: Namespace, |     pub shared_ipcns: Namespace, | ||||||
|     pub sandbox_pidns: Option<Namespace>, |     pub sandbox_pidns: Option<Namespace>, | ||||||
| @@ -66,6 +67,7 @@ impl Sandbox { | |||||||
|             mounts: Vec::new(), |             mounts: Vec::new(), | ||||||
|             container_mounts: HashMap::new(), |             container_mounts: HashMap::new(), | ||||||
|             pci_device_map: HashMap::new(), |             pci_device_map: HashMap::new(), | ||||||
|  |             dev_watcher: HashMap::new(), | ||||||
|             shared_utsns: Namespace::new(&logger), |             shared_utsns: Namespace::new(&logger), | ||||||
|             shared_ipcns: Namespace::new(&logger), |             shared_ipcns: Namespace::new(&logger), | ||||||
|             sandbox_pidns: None, |             sandbox_pidns: None, | ||||||
|   | |||||||
| @@ -6,7 +6,6 @@ | |||||||
| use crate::device::online_device; | use crate::device::online_device; | ||||||
| use crate::linux_abi::*; | use crate::linux_abi::*; | ||||||
| use crate::sandbox::Sandbox; | use crate::sandbox::Sandbox; | ||||||
| use crate::GLOBAL_DEVICE_WATCHER; |  | ||||||
| use slog::Logger; | use slog::Logger; | ||||||
|  |  | ||||||
| use anyhow::Result; | use anyhow::Result; | ||||||
| @@ -66,10 +65,6 @@ impl Uevent { | |||||||
|  |  | ||||||
|     async fn handle_block_add_event(&self, sandbox: &Arc<Mutex<Sandbox>>) { |     async fn handle_block_add_event(&self, sandbox: &Arc<Mutex<Sandbox>>) { | ||||||
|         let pci_root_bus_path = create_pci_root_bus_path(); |         let pci_root_bus_path = create_pci_root_bus_path(); | ||||||
|  |  | ||||||
|         // Keep the same lock order as device::get_device_name(), otherwise it may cause deadlock. |  | ||||||
|         let watcher = GLOBAL_DEVICE_WATCHER.clone(); |  | ||||||
|         let mut w = watcher.lock().await; |  | ||||||
|         let mut sb = sandbox.lock().await; |         let mut sb = sandbox.lock().await; | ||||||
|  |  | ||||||
|         // Add the device node name to the pci device map. |         // Add the device node name to the pci device map. | ||||||
| @@ -79,9 +74,10 @@ impl Uevent { | |||||||
|         // Notify watchers that are interested in the udev event. |         // Notify watchers that are interested in the udev event. | ||||||
|         // Close the channel after watcher has been notified. |         // Close the channel after watcher has been notified. | ||||||
|         let devpath = self.devpath.clone(); |         let devpath = self.devpath.clone(); | ||||||
|         let empties: Vec<_> = w |         let keys: Vec<_> = sb | ||||||
|             .iter_mut() |             .dev_watcher | ||||||
|             .filter(|(dev_addr, _)| { |             .keys() | ||||||
|  |             .filter(|dev_addr| { | ||||||
|                 let pci_p = format!("{}{}", pci_root_bus_path, *dev_addr); |                 let pci_p = format!("{}{}", pci_root_bus_path, *dev_addr); | ||||||
|  |  | ||||||
|                 // blk block device |                 // blk block device | ||||||
| @@ -99,17 +95,16 @@ impl Uevent { | |||||||
|                         dev_addr.ends_with(pmem_suffix.as_str()) |                         dev_addr.ends_with(pmem_suffix.as_str()) | ||||||
|                 } |                 } | ||||||
|             }) |             }) | ||||||
|             .map(|(k, sender)| { |             .cloned() | ||||||
|                 let devname = self.devname.clone(); |  | ||||||
|                 let sender = sender.take().unwrap(); |  | ||||||
|                 let _ = sender.send(devname); |  | ||||||
|                 k.clone() |  | ||||||
|             }) |  | ||||||
|             .collect(); |             .collect(); | ||||||
|  |  | ||||||
|         // Remove notified nodes from the watcher map. |         for k in keys { | ||||||
|         for empty in empties { |             let devname = self.devname.clone(); | ||||||
|             w.remove(&empty); |             // unwrap() is safe because logic above ensures k exists | ||||||
|  |             // in the map, and it's locked so no-one else can change | ||||||
|  |             // that | ||||||
|  |             let sender = sb.dev_watcher.remove(&k).unwrap(); | ||||||
|  |             let _ = sender.send(devname); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user