agent: simplify storage device by removing StorageDeviceObject

Simplify storage device implementation by removing StorageDeviceObject.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-25 17:21:03 +08:00
parent 0e7248264d
commit aaa5ab1264
8 changed files with 47 additions and 80 deletions

View File

@ -10,6 +10,7 @@ use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::{thread, time};
@ -43,11 +44,10 @@ pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
pub type StorageDeviceObject = Arc<Mutex<dyn StorageDevice>>;
#[derive(Clone)]
pub struct StorageState {
inner: StorageDeviceObject,
count: Arc<AtomicU32>,
device: Arc<dyn StorageDevice>,
}
impl Debug for StorageState {
@ -59,24 +59,28 @@ impl Debug for StorageState {
impl StorageState {
fn new() -> Self {
StorageState {
inner: Arc::new(Mutex::new(StorageDeviceGeneric::new("".to_string()))),
count: Arc::new(AtomicU32::new(1)),
device: Arc::new(StorageDeviceGeneric::new("".to_string())),
}
}
pub fn from_device(device: StorageDeviceObject) -> Self {
Self { inner: device }
pub fn from_device(device: Arc<dyn StorageDevice>) -> Self {
Self {
count: Arc::new(AtomicU32::new(1)),
device,
}
}
pub async fn ref_count(&self) -> u32 {
self.inner.lock().await.ref_count()
self.count.load(Ordering::Relaxed)
}
async fn inc_ref_count(&self) {
self.inner.lock().await.inc_ref_count()
self.count.fetch_add(1, Ordering::Acquire);
}
async fn dec_and_test_ref_count(&self) -> bool {
self.inner.lock().await.dec_and_test_ref_count()
self.count.fetch_sub(1, Ordering::AcqRel) == 1
}
}
@ -162,8 +166,8 @@ impl Sandbox {
pub fn update_sandbox_storage(
&mut self,
path: &str,
device: StorageDeviceObject,
) -> std::result::Result<StorageDeviceObject, StorageDeviceObject> {
device: Arc<dyn StorageDevice>,
) -> std::result::Result<Arc<dyn StorageDevice>, Arc<dyn StorageDevice>> {
if !self.storages.contains_key(path) {
return Err(device);
}
@ -171,7 +175,7 @@ impl Sandbox {
let state = StorageState::from_device(device);
// Safe to unwrap() because we have just ensured existence of entry.
let state = self.storages.insert(path.to_string(), state).unwrap();
Ok(state.inner)
Ok(state.device)
}
// Clean mount and directory of a mountpoint.

View File

@ -5,11 +5,12 @@
//
use anyhow::Result;
use kata_types::mount::StorageDevice;
use protocols::agent::Storage;
use std::iter;
use std::sync::Arc;
use tracing::instrument;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{new_device, StorageContext, StorageHandler};
#[derive(Debug)]
@ -22,7 +23,7 @@ impl StorageHandler for BindWatcherHandler {
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
if let Some(cid) = ctx.cid {
ctx.sandbox
.lock()

View File

@ -8,8 +8,10 @@ use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use kata_types::mount::StorageDevice;
use protocols::agent::Storage;
use tracing::instrument;
@ -18,7 +20,6 @@ use crate::device::{
wait_for_pmem_device,
};
use crate::pci;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{common_storage_handler, new_device, StorageContext, StorageHandler};
#[cfg(target_arch = "s390x")]
use crate::{ccw, device::get_virtio_blk_ccw_device_name};
@ -33,7 +34,7 @@ impl StorageHandler for VirtioBlkMmioHandler {
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
if !Path::new(&storage.source).exists() {
get_virtio_mmio_device_name(ctx.sandbox, &storage.source)
.await
@ -54,7 +55,7 @@ impl StorageHandler for VirtioBlkPciHandler {
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
// If hot-plugged, get the device node path based on the PCI path
// otherwise use the virt path provided in Storage Source
if storage.source.starts_with("/dev") {
@ -86,7 +87,7 @@ impl StorageHandler for VirtioBlkCcwHandler {
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(ctx.sandbox, &ccw_device).await?;
storage.source = dev_path;
@ -100,7 +101,7 @@ impl StorageHandler for VirtioBlkCcwHandler {
&self,
_storage: Storage,
_ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
Err(anyhow!("CCW is only supported on s390x"))
}
}
@ -115,7 +116,7 @@ impl StorageHandler for ScsiHandler {
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
// Retrieve the device path from SCSI address.
let dev_path = get_scsi_device_name(ctx.sandbox, &storage.source).await?;
storage.source = dev_path;
@ -135,7 +136,7 @@ impl StorageHandler for PmemHandler {
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
// Retrieve the device for pmem storage
wait_for_pmem_device(ctx.sandbox, &storage.source).await?;

View File

@ -13,7 +13,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use kata_sys_util::mount::parse_mount_options;
use kata_types::mount::KATA_MOUNT_OPTION_FS_GID;
use kata_types::mount::{StorageDevice, KATA_MOUNT_OPTION_FS_GID};
use nix::unistd::Gid;
use protocols::agent::Storage;
use slog::Logger;
@ -22,7 +22,7 @@ use tracing::instrument;
use crate::device::{DRIVER_EPHEMERAL_TYPE, FS_TYPE_HUGETLB};
use crate::mount::baremount;
use crate::sandbox::{Sandbox, StorageDeviceObject};
use crate::sandbox::Sandbox;
use crate::storage::{
common_storage_handler, new_device, parse_options, StorageContext, StorageHandler, MODE_SETGID,
};
@ -40,7 +40,7 @@ impl StorageHandler for EphemeralHandler {
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
info!(ctx.logger, "handle hugetlbfs storage");

View File

@ -6,12 +6,13 @@
use std::fs;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use kata_types::mount::StorageDevice;
use protocols::agent::Storage;
use tracing::instrument;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{common_storage_handler, new_device, StorageContext, StorageHandler};
#[derive(Debug)]
@ -24,7 +25,7 @@ impl StorageHandler for OverlayfsHandler {
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
if storage
.options
.iter()
@ -65,7 +66,7 @@ impl StorageHandler for Virtio9pHandler {
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
@ -81,7 +82,7 @@ impl StorageHandler for VirtioFsHandler {
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}

View File

@ -6,14 +6,14 @@
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use anyhow::{Context, Result};
use kata_types::mount::KATA_MOUNT_OPTION_FS_GID;
use kata_types::mount::{StorageDevice, KATA_MOUNT_OPTION_FS_GID};
use nix::unistd::Gid;
use protocols::agent::Storage;
use tracing::instrument;
use crate::sandbox::StorageDeviceObject;
use crate::storage::{new_device, parse_options, StorageContext, StorageHandler, MODE_SETGID};
#[derive(Debug)]
@ -26,7 +26,7 @@ impl StorageHandler for LocalHandler {
&self,
storage: Storage,
_ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
) -> Result<Arc<dyn StorageDevice>> {
fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}",
&storage.mount_point

View File

@ -13,7 +13,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use kata_sys_util::mount::{create_mount_destination, parse_mount_options};
use kata_types::mount::{
StorageDeviceGeneric, StorageHandlerManager, KATA_SHAREDFS_GUEST_PREMOUNT_TAG,
StorageDevice, StorageDeviceGeneric, StorageHandlerManager, KATA_SHAREDFS_GUEST_PREMOUNT_TAG,
};
use nix::unistd::{Gid, Uid};
use protocols::agent::Storage;
@ -33,7 +33,7 @@ use crate::device::{
DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE,
};
use crate::mount::{baremount, is_mounted};
use crate::sandbox::{Sandbox, StorageDeviceObject};
use crate::sandbox::Sandbox;
pub use self::ephemeral_handler::update_ephemeral_mounts;
@ -63,7 +63,7 @@ pub trait StorageHandler: Send + Sync {
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject>;
) -> Result<Arc<dyn StorageDevice>>;
}
#[rustfmt::skip]
@ -124,7 +124,7 @@ pub async fn add_storages(
.update_sandbox_storage(&path, device.clone())
{
Ok(d) => {
let path = device.lock().await.path().to_string();
let path = device.path().to_string();
if !path.is_empty() {
mount_list.push(path.clone());
}
@ -136,7 +136,7 @@ pub async fn add_storages(
{
warn!(logger, "failed to remove dummy sandbox storage {:?}", e);
}
device.lock().await.cleanup();
device.cleanup();
return Err(anyhow!("failed to update device for storage"));
}
}
@ -160,9 +160,9 @@ pub async fn add_storages(
Ok(mount_list)
}
pub(crate) fn new_device(path: String) -> Result<StorageDeviceObject> {
pub(crate) fn new_device(path: String) -> Result<Arc<dyn StorageDevice>> {
let device = StorageDeviceGeneric::new(path);
Ok(Arc::new(Mutex::new(device)))
Ok(Arc::new(device))
}
#[instrument]

View File

@ -431,14 +431,13 @@ impl TryFrom<&NydusExtraOptions> for KataVirtualVolume {
/// An implementation of generic storage device.
pub struct StorageDeviceGeneric {
refcount: u32,
path: String,
}
impl std::fmt::Debug for StorageDeviceGeneric {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageState")
.field("refcount", &self.refcount)
.field("path", &self.path)
.finish()
}
}
@ -446,7 +445,7 @@ impl std::fmt::Debug for StorageDeviceGeneric {
impl StorageDeviceGeneric {
/// Create a new instance of `StorageStateCommon`.
pub fn new(path: String) -> Self {
StorageDeviceGeneric { refcount: 1, path }
StorageDeviceGeneric { path }
}
}
@ -455,20 +454,6 @@ impl StorageDevice for StorageDeviceGeneric {
&self.path
}
fn ref_count(&self) -> u32 {
self.refcount
}
fn inc_ref_count(&mut self) {
self.refcount += 1;
}
fn dec_and_test_ref_count(&mut self) -> bool {
assert!(self.refcount > 0);
self.refcount -= 1;
self.refcount == 0
}
fn cleanup(&self) {}
}
@ -477,15 +462,6 @@ pub trait StorageDevice: Send + Sync {
/// Path
fn path(&self) -> &str;
/// Get reference count.
fn ref_count(&self) -> u32;
/// Increase reference count.
fn inc_ref_count(&mut self);
/// Decrease reference count and return true if it reaches zero.
fn dec_and_test_ref_count(&mut self) -> bool;
/// Clean up resources related to the storage device.
fn cleanup(&self);
}
@ -763,20 +739,4 @@ mod tests {
);
assert_eq!(volume.fs_type.as_str(), "rafsv6")
}
#[test]
fn test_storage_state_common() {
let mut state = StorageDeviceGeneric::new("".to_string());
assert_eq!(state.ref_count(), 1);
state.inc_ref_count();
assert_eq!(state.ref_count(), 2);
state.inc_ref_count();
assert_eq!(state.ref_count(), 3);
assert!(!state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 2);
assert!(!state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 1);
assert!(state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 0);
}
}