agent: simplify storage device by removing StorageDeviceObject

Simplify storage device implementation by removing StorageDeviceObject.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-25 17:21:03 +08:00
parent 0e7248264d
commit aaa5ab1264
8 changed files with 47 additions and 80 deletions

View File

@ -10,6 +10,7 @@ use std::fs;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::{thread, time};
@ -43,11 +44,10 @@ pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>); type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
pub type StorageDeviceObject = Arc<Mutex<dyn StorageDevice>>;
#[derive(Clone)] #[derive(Clone)]
pub struct StorageState { pub struct StorageState {
inner: StorageDeviceObject, count: Arc<AtomicU32>,
device: Arc<dyn StorageDevice>,
} }
impl Debug for StorageState { impl Debug for StorageState {
@ -59,24 +59,28 @@ impl Debug for StorageState {
impl StorageState { impl StorageState {
fn new() -> Self { fn new() -> Self {
StorageState { StorageState {
inner: Arc::new(Mutex::new(StorageDeviceGeneric::new("".to_string()))), count: Arc::new(AtomicU32::new(1)),
device: Arc::new(StorageDeviceGeneric::new("".to_string())),
} }
} }
pub fn from_device(device: StorageDeviceObject) -> Self { pub fn from_device(device: Arc<dyn StorageDevice>) -> Self {
Self { inner: device } Self {
count: Arc::new(AtomicU32::new(1)),
device,
}
} }
pub async fn ref_count(&self) -> u32 { pub async fn ref_count(&self) -> u32 {
self.inner.lock().await.ref_count() self.count.load(Ordering::Relaxed)
} }
async fn inc_ref_count(&self) { async fn inc_ref_count(&self) {
self.inner.lock().await.inc_ref_count() self.count.fetch_add(1, Ordering::Acquire);
} }
async fn dec_and_test_ref_count(&self) -> bool { async fn dec_and_test_ref_count(&self) -> bool {
self.inner.lock().await.dec_and_test_ref_count() self.count.fetch_sub(1, Ordering::AcqRel) == 1
} }
} }
@ -162,8 +166,8 @@ impl Sandbox {
pub fn update_sandbox_storage( pub fn update_sandbox_storage(
&mut self, &mut self,
path: &str, path: &str,
device: StorageDeviceObject, device: Arc<dyn StorageDevice>,
) -> std::result::Result<StorageDeviceObject, StorageDeviceObject> { ) -> std::result::Result<Arc<dyn StorageDevice>, Arc<dyn StorageDevice>> {
if !self.storages.contains_key(path) { if !self.storages.contains_key(path) {
return Err(device); return Err(device);
} }
@ -171,7 +175,7 @@ impl Sandbox {
let state = StorageState::from_device(device); let state = StorageState::from_device(device);
// Safe to unwrap() because we have just ensured existence of entry. // Safe to unwrap() because we have just ensured existence of entry.
let state = self.storages.insert(path.to_string(), state).unwrap(); let state = self.storages.insert(path.to_string(), state).unwrap();
Ok(state.inner) Ok(state.device)
} }
// Clean mount and directory of a mountpoint. // Clean mount and directory of a mountpoint.

View File

@ -5,11 +5,12 @@
// //
use anyhow::Result; use anyhow::Result;
use kata_types::mount::StorageDevice;
use protocols::agent::Storage; use protocols::agent::Storage;
use std::iter; use std::iter;
use std::sync::Arc;
use tracing::instrument; use tracing::instrument;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{new_device, StorageContext, StorageHandler}; use crate::storage::{new_device, StorageContext, StorageHandler};
#[derive(Debug)] #[derive(Debug)]
@ -22,7 +23,7 @@ impl StorageHandler for BindWatcherHandler {
&self, &self,
storage: Storage, storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
if let Some(cid) = ctx.cid { if let Some(cid) = ctx.cid {
ctx.sandbox ctx.sandbox
.lock() .lock()

View File

@ -8,8 +8,10 @@ use std::fs;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_types::mount::StorageDevice;
use protocols::agent::Storage; use protocols::agent::Storage;
use tracing::instrument; use tracing::instrument;
@ -18,7 +20,6 @@ use crate::device::{
wait_for_pmem_device, wait_for_pmem_device,
}; };
use crate::pci; use crate::pci;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{common_storage_handler, new_device, StorageContext, StorageHandler}; use crate::storage::{common_storage_handler, new_device, StorageContext, StorageHandler};
#[cfg(target_arch = "s390x")] #[cfg(target_arch = "s390x")]
use crate::{ccw, device::get_virtio_blk_ccw_device_name}; use crate::{ccw, device::get_virtio_blk_ccw_device_name};
@ -33,7 +34,7 @@ impl StorageHandler for VirtioBlkMmioHandler {
&self, &self,
storage: Storage, storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
if !Path::new(&storage.source).exists() { if !Path::new(&storage.source).exists() {
get_virtio_mmio_device_name(ctx.sandbox, &storage.source) get_virtio_mmio_device_name(ctx.sandbox, &storage.source)
.await .await
@ -54,7 +55,7 @@ impl StorageHandler for VirtioBlkPciHandler {
&self, &self,
mut storage: Storage, mut storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
// If hot-plugged, get the device node path based on the PCI path // If hot-plugged, get the device node path based on the PCI path
// otherwise use the virt path provided in Storage Source // otherwise use the virt path provided in Storage Source
if storage.source.starts_with("/dev") { if storage.source.starts_with("/dev") {
@ -86,7 +87,7 @@ impl StorageHandler for VirtioBlkCcwHandler {
&self, &self,
mut storage: Storage, mut storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
let ccw_device = ccw::Device::from_str(&storage.source)?; let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(ctx.sandbox, &ccw_device).await?; let dev_path = get_virtio_blk_ccw_device_name(ctx.sandbox, &ccw_device).await?;
storage.source = dev_path; storage.source = dev_path;
@ -100,7 +101,7 @@ impl StorageHandler for VirtioBlkCcwHandler {
&self, &self,
_storage: Storage, _storage: Storage,
_ctx: &mut StorageContext, _ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
Err(anyhow!("CCW is only supported on s390x")) Err(anyhow!("CCW is only supported on s390x"))
} }
} }
@ -115,7 +116,7 @@ impl StorageHandler for ScsiHandler {
&self, &self,
mut storage: Storage, mut storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
// Retrieve the device path from SCSI address. // Retrieve the device path from SCSI address.
let dev_path = get_scsi_device_name(ctx.sandbox, &storage.source).await?; let dev_path = get_scsi_device_name(ctx.sandbox, &storage.source).await?;
storage.source = dev_path; storage.source = dev_path;
@ -135,7 +136,7 @@ impl StorageHandler for PmemHandler {
&self, &self,
storage: Storage, storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
// Retrieve the device for pmem storage // Retrieve the device for pmem storage
wait_for_pmem_device(ctx.sandbox, &storage.source).await?; wait_for_pmem_device(ctx.sandbox, &storage.source).await?;

View File

@ -13,7 +13,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_sys_util::mount::parse_mount_options; use kata_sys_util::mount::parse_mount_options;
use kata_types::mount::KATA_MOUNT_OPTION_FS_GID; use kata_types::mount::{StorageDevice, KATA_MOUNT_OPTION_FS_GID};
use nix::unistd::Gid; use nix::unistd::Gid;
use protocols::agent::Storage; use protocols::agent::Storage;
use slog::Logger; use slog::Logger;
@ -22,7 +22,7 @@ use tracing::instrument;
use crate::device::{DRIVER_EPHEMERAL_TYPE, FS_TYPE_HUGETLB}; use crate::device::{DRIVER_EPHEMERAL_TYPE, FS_TYPE_HUGETLB};
use crate::mount::baremount; use crate::mount::baremount;
use crate::sandbox::{Sandbox, StorageDeviceObject}; use crate::sandbox::Sandbox;
use crate::storage::{ use crate::storage::{
common_storage_handler, new_device, parse_options, StorageContext, StorageHandler, MODE_SETGID, common_storage_handler, new_device, parse_options, StorageContext, StorageHandler, MODE_SETGID,
}; };
@ -40,7 +40,7 @@ impl StorageHandler for EphemeralHandler {
&self, &self,
mut storage: Storage, mut storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
// hugetlbfs // hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB { if storage.fstype == FS_TYPE_HUGETLB {
info!(ctx.logger, "handle hugetlbfs storage"); info!(ctx.logger, "handle hugetlbfs storage");

View File

@ -6,12 +6,13 @@
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_types::mount::StorageDevice;
use protocols::agent::Storage; use protocols::agent::Storage;
use tracing::instrument; use tracing::instrument;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{common_storage_handler, new_device, StorageContext, StorageHandler}; use crate::storage::{common_storage_handler, new_device, StorageContext, StorageHandler};
#[derive(Debug)] #[derive(Debug)]
@ -24,7 +25,7 @@ impl StorageHandler for OverlayfsHandler {
&self, &self,
mut storage: Storage, mut storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
if storage if storage
.options .options
.iter() .iter()
@ -65,7 +66,7 @@ impl StorageHandler for Virtio9pHandler {
&self, &self,
storage: Storage, storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
let path = common_storage_handler(ctx.logger, &storage)?; let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path) new_device(path)
} }
@ -81,7 +82,7 @@ impl StorageHandler for VirtioFsHandler {
&self, &self,
storage: Storage, storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
let path = common_storage_handler(ctx.logger, &storage)?; let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path) new_device(path)
} }

View File

@ -6,14 +6,14 @@
use std::fs; use std::fs;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use kata_types::mount::KATA_MOUNT_OPTION_FS_GID; use kata_types::mount::{StorageDevice, KATA_MOUNT_OPTION_FS_GID};
use nix::unistd::Gid; use nix::unistd::Gid;
use protocols::agent::Storage; use protocols::agent::Storage;
use tracing::instrument; use tracing::instrument;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{new_device, parse_options, StorageContext, StorageHandler, MODE_SETGID}; use crate::storage::{new_device, parse_options, StorageContext, StorageHandler, MODE_SETGID};
#[derive(Debug)] #[derive(Debug)]
@ -26,7 +26,7 @@ impl StorageHandler for LocalHandler {
&self, &self,
storage: Storage, storage: Storage,
_ctx: &mut StorageContext, _ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> { ) -> Result<Arc<dyn StorageDevice>> {
fs::create_dir_all(&storage.mount_point).context(format!( fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}", "failed to create dir all {:?}",
&storage.mount_point &storage.mount_point

View File

@ -13,7 +13,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_sys_util::mount::{create_mount_destination, parse_mount_options}; use kata_sys_util::mount::{create_mount_destination, parse_mount_options};
use kata_types::mount::{ use kata_types::mount::{
StorageDeviceGeneric, StorageHandlerManager, KATA_SHAREDFS_GUEST_PREMOUNT_TAG, StorageDevice, StorageDeviceGeneric, StorageHandlerManager, KATA_SHAREDFS_GUEST_PREMOUNT_TAG,
}; };
use nix::unistd::{Gid, Uid}; use nix::unistd::{Gid, Uid};
use protocols::agent::Storage; use protocols::agent::Storage;
@ -33,7 +33,7 @@ use crate::device::{
DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE, DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE,
}; };
use crate::mount::{baremount, is_mounted}; use crate::mount::{baremount, is_mounted};
use crate::sandbox::{Sandbox, StorageDeviceObject}; use crate::sandbox::Sandbox;
pub use self::ephemeral_handler::update_ephemeral_mounts; pub use self::ephemeral_handler::update_ephemeral_mounts;
@ -63,7 +63,7 @@ pub trait StorageHandler: Send + Sync {
&self, &self,
storage: Storage, storage: Storage,
ctx: &mut StorageContext, ctx: &mut StorageContext,
) -> Result<StorageDeviceObject>; ) -> Result<Arc<dyn StorageDevice>>;
} }
#[rustfmt::skip] #[rustfmt::skip]
@ -124,7 +124,7 @@ pub async fn add_storages(
.update_sandbox_storage(&path, device.clone()) .update_sandbox_storage(&path, device.clone())
{ {
Ok(d) => { Ok(d) => {
let path = device.lock().await.path().to_string(); let path = device.path().to_string();
if !path.is_empty() { if !path.is_empty() {
mount_list.push(path.clone()); mount_list.push(path.clone());
} }
@ -136,7 +136,7 @@ pub async fn add_storages(
{ {
warn!(logger, "failed to remove dummy sandbox storage {:?}", e); warn!(logger, "failed to remove dummy sandbox storage {:?}", e);
} }
device.lock().await.cleanup(); device.cleanup();
return Err(anyhow!("failed to update device for storage")); return Err(anyhow!("failed to update device for storage"));
} }
} }
@ -160,9 +160,9 @@ pub async fn add_storages(
Ok(mount_list) Ok(mount_list)
} }
pub(crate) fn new_device(path: String) -> Result<StorageDeviceObject> { pub(crate) fn new_device(path: String) -> Result<Arc<dyn StorageDevice>> {
let device = StorageDeviceGeneric::new(path); let device = StorageDeviceGeneric::new(path);
Ok(Arc::new(Mutex::new(device))) Ok(Arc::new(device))
} }
#[instrument] #[instrument]

View File

@ -431,14 +431,13 @@ impl TryFrom<&NydusExtraOptions> for KataVirtualVolume {
/// An implementation of generic storage device. /// An implementation of generic storage device.
pub struct StorageDeviceGeneric { pub struct StorageDeviceGeneric {
refcount: u32,
path: String, path: String,
} }
impl std::fmt::Debug for StorageDeviceGeneric { impl std::fmt::Debug for StorageDeviceGeneric {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageState") f.debug_struct("StorageState")
.field("refcount", &self.refcount) .field("path", &self.path)
.finish() .finish()
} }
} }
@ -446,7 +445,7 @@ impl std::fmt::Debug for StorageDeviceGeneric {
impl StorageDeviceGeneric { impl StorageDeviceGeneric {
/// Create a new instance of `StorageStateCommon`. /// Create a new instance of `StorageStateCommon`.
pub fn new(path: String) -> Self { pub fn new(path: String) -> Self {
StorageDeviceGeneric { refcount: 1, path } StorageDeviceGeneric { path }
} }
} }
@ -455,20 +454,6 @@ impl StorageDevice for StorageDeviceGeneric {
&self.path &self.path
} }
fn ref_count(&self) -> u32 {
self.refcount
}
fn inc_ref_count(&mut self) {
self.refcount += 1;
}
fn dec_and_test_ref_count(&mut self) -> bool {
assert!(self.refcount > 0);
self.refcount -= 1;
self.refcount == 0
}
fn cleanup(&self) {} fn cleanup(&self) {}
} }
@ -477,15 +462,6 @@ pub trait StorageDevice: Send + Sync {
/// Path /// Path
fn path(&self) -> &str; fn path(&self) -> &str;
/// Get reference count.
fn ref_count(&self) -> u32;
/// Increase reference count.
fn inc_ref_count(&mut self);
/// Decrease reference count and return true if it reaches zero.
fn dec_and_test_ref_count(&mut self) -> bool;
/// Clean up resources related to the storage device. /// Clean up resources related to the storage device.
fn cleanup(&self); fn cleanup(&self);
} }
@ -763,20 +739,4 @@ mod tests {
); );
assert_eq!(volume.fs_type.as_str(), "rafsv6") assert_eq!(volume.fs_type.as_str(), "rafsv6")
} }
#[test]
fn test_storage_state_common() {
let mut state = StorageDeviceGeneric::new("".to_string());
assert_eq!(state.ref_count(), 1);
state.inc_ref_count();
assert_eq!(state.ref_count(), 2);
state.inc_ref_count();
assert_eq!(state.ref_count(), 3);
assert!(!state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 2);
assert!(!state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 1);
assert!(state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 0);
}
} }