agent: switch to new storage subsystem

Switch to new storage subsystem to create a StorageDevice for each
storage object.

Fixes: #7614

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-14 20:26:58 +08:00
parent fcbda0b419
commit 60ca12ccb0
3 changed files with 389 additions and 305 deletions

View File

@ -35,9 +35,9 @@ const VM_ROOTFS: &str = "/";
const BLOCK: &str = "block"; const BLOCK: &str = "block";
pub const DRIVER_9P_TYPE: &str = "9p"; pub const DRIVER_9P_TYPE: &str = "9p";
pub const DRIVER_VIRTIOFS_TYPE: &str = "virtio-fs"; pub const DRIVER_VIRTIOFS_TYPE: &str = "virtio-fs";
pub const DRIVER_BLK_TYPE: &str = "blk"; pub const DRIVER_BLK_PCI_TYPE: &str = "blk";
pub const DRIVER_BLK_CCW_TYPE: &str = "blk-ccw"; pub const DRIVER_BLK_CCW_TYPE: &str = "blk-ccw";
pub const DRIVER_MMIO_BLK_TYPE: &str = "mmioblk"; pub const DRIVER_BLK_MMIO_TYPE: &str = "mmioblk";
pub const DRIVER_SCSI_TYPE: &str = "scsi"; pub const DRIVER_SCSI_TYPE: &str = "scsi";
pub const DRIVER_NVDIMM_TYPE: &str = "nvdimm"; pub const DRIVER_NVDIMM_TYPE: &str = "nvdimm";
pub const DRIVER_EPHEMERAL_TYPE: &str = "ephemeral"; pub const DRIVER_EPHEMERAL_TYPE: &str = "ephemeral";
@ -935,9 +935,9 @@ async fn add_device(device: &Device, sandbox: &Arc<Mutex<Sandbox>>) -> Result<Sp
} }
match device.type_.as_str() { match device.type_.as_str() {
DRIVER_BLK_TYPE => virtio_blk_device_handler(device, sandbox).await, DRIVER_BLK_PCI_TYPE => virtio_blk_device_handler(device, sandbox).await,
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_device_handler(device, sandbox).await, DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_device_handler(device, sandbox).await,
DRIVER_MMIO_BLK_TYPE => virtiommio_blk_device_handler(device, sandbox).await, DRIVER_BLK_MMIO_TYPE => virtiommio_blk_device_handler(device, sandbox).await,
DRIVER_NVDIMM_TYPE => virtio_nvdimm_device_handler(device, sandbox).await, DRIVER_NVDIMM_TYPE => virtio_nvdimm_device_handler(device, sandbox).await,
DRIVER_SCSI_TYPE => virtio_scsi_device_handler(device, sandbox).await, DRIVER_SCSI_TYPE => virtio_scsi_device_handler(device, sandbox).await,
DRIVER_VFIO_PCI_GK_TYPE | DRIVER_VFIO_PCI_TYPE => { DRIVER_VFIO_PCI_GK_TYPE | DRIVER_VFIO_PCI_TYPE => {

View File

@ -3,6 +3,8 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
#![allow(dead_code)]
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
@ -16,7 +18,10 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_sys_util::mount::{create_mount_destination, get_linux_mount_info, parse_mount_options}; use kata_sys_util::mount::{create_mount_destination, get_linux_mount_info, parse_mount_options};
use kata_types::mount::{KATA_MOUNT_OPTION_FS_GID, KATA_SHAREDFS_GUEST_PREMOUNT_TAG}; use kata_types::mount::{
StorageDeviceGeneric, StorageHandlerManager, KATA_MOUNT_OPTION_FS_GID,
KATA_SHAREDFS_GUEST_PREMOUNT_TAG,
};
use nix::mount::MsFlags; use nix::mount::MsFlags;
use nix::unistd::{Gid, Uid}; use nix::unistd::{Gid, Uid};
use regex::Regex; use regex::Regex;
@ -26,15 +31,15 @@ use tracing::instrument;
use crate::device::{ use crate::device::{
get_scsi_device_name, get_virtio_blk_pci_device_name, get_virtio_mmio_device_name, get_scsi_device_name, get_virtio_blk_pci_device_name, get_virtio_mmio_device_name,
online_device, wait_for_pmem_device, DRIVER_9P_TYPE, DRIVER_BLK_CCW_TYPE, DRIVER_BLK_TYPE, online_device, wait_for_pmem_device, DRIVER_9P_TYPE, DRIVER_BLK_MMIO_TYPE, DRIVER_BLK_PCI_TYPE,
DRIVER_EPHEMERAL_TYPE, DRIVER_LOCAL_TYPE, DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_EPHEMERAL_TYPE, DRIVER_LOCAL_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_OVERLAYFS_TYPE,
DRIVER_OVERLAYFS_TYPE, DRIVER_SCSI_TYPE, DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE, DRIVER_SCSI_TYPE, DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE, FS_TYPE_HUGETLB,
FS_TYPE_HUGETLB,
}; };
use crate::linux_abi::*; use crate::linux_abi::*;
use crate::pci; use crate::pci;
use crate::protocols::agent::Storage; use crate::protocols::agent::Storage;
use crate::protocols::types::FSGroupChangePolicy; use crate::protocols::types::FSGroupChangePolicy;
use crate::sandbox::StorageDeviceObject;
use crate::Sandbox; use crate::Sandbox;
#[cfg(target_arch = "s390x")] #[cfg(target_arch = "s390x")]
use crate::{ccw, device::get_virtio_blk_ccw_device_name}; use crate::{ccw, device::get_virtio_blk_ccw_device_name};
@ -57,6 +62,24 @@ pub struct InitMount<'a> {
options: Vec<&'a str>, options: Vec<&'a str>,
} }
#[derive(Debug)]
pub struct StorageContext<'a> {
cid: &'a Option<String>,
logger: &'a Logger,
sandbox: &'a Arc<Mutex<Sandbox>>,
}
/// Trait object to handle storage device.
#[async_trait::async_trait]
pub trait StorageHandler: Send + Sync {
/// Create a new storage device.
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject>;
}
#[rustfmt::skip] #[rustfmt::skip]
lazy_static!{ lazy_static!{
static ref CGROUPS: HashMap<&'static str, &'static str> = { static ref CGROUPS: HashMap<&'static str, &'static str> = {
@ -90,17 +113,37 @@ lazy_static! {
]; ];
} }
#[rustfmt::skip]
lazy_static! {
pub static ref STORAGE_HANDLERS: StorageHandlerManager<Arc<dyn StorageHandler>> = {
let mut manager: StorageHandlerManager<Arc<dyn StorageHandler>> = StorageHandlerManager::new();
manager.add_handler(DRIVER_9P_TYPE, Arc::new(Virtio9pHandler{})).unwrap();
#[cfg(target_arch = "s390x")]
manager.add_handler(crate::device::DRIVER_BLK_CCW_TYPE, Arc::new(VirtioBlkCcwHandler{})).unwrap();
manager.add_handler(DRIVER_BLK_MMIO_TYPE, Arc::new(VirtioBlkMmioHandler{})).unwrap();
manager.add_handler(DRIVER_BLK_PCI_TYPE, Arc::new(VirtioBlkPciHandler{})).unwrap();
manager.add_handler(DRIVER_EPHEMERAL_TYPE, Arc::new(EphemeralHandler{})).unwrap();
manager.add_handler(DRIVER_LOCAL_TYPE, Arc::new(LocalHandler{})).unwrap();
manager.add_handler(DRIVER_NVDIMM_TYPE, Arc::new(PmemHandler{})).unwrap();
manager.add_handler(DRIVER_OVERLAYFS_TYPE, Arc::new(OverlayfsHandler{})).unwrap();
manager.add_handler(DRIVER_SCSI_TYPE, Arc::new(ScsiHandler{})).unwrap();
manager.add_handler(DRIVER_VIRTIOFS_TYPE, Arc::new(VirtioFsHandler{})).unwrap();
manager.add_handler(DRIVER_WATCHABLE_BIND_TYPE, Arc::new(BindWatcherHandler{})).unwrap();
manager
};
}
pub const STORAGE_HANDLER_LIST: &[&str] = &[ pub const STORAGE_HANDLER_LIST: &[&str] = &[
DRIVER_BLK_TYPE, //DRIVER_BLK_TYPE,
DRIVER_9P_TYPE, //DRIVER_9P_TYPE,
DRIVER_VIRTIOFS_TYPE, //DRIVER_VIRTIOFS_TYPE,
DRIVER_EPHEMERAL_TYPE, //DRIVER_EPHEMERAL_TYPE,
DRIVER_OVERLAYFS_TYPE, //DRIVER_OVERLAYFS_TYPE,
DRIVER_MMIO_BLK_TYPE, //DRIVER_MMIO_BLK_TYPE,
DRIVER_LOCAL_TYPE, //DRIVER_LOCAL_TYPE,
DRIVER_SCSI_TYPE, //DRIVER_SCSI_TYPE,
DRIVER_NVDIMM_TYPE, //DRIVER_NVDIMM_TYPE,
DRIVER_WATCHABLE_BIND_TYPE, //DRIVER_WATCHABLE_BIND_TYPE,
]; ];
#[instrument] #[instrument]
@ -161,48 +204,54 @@ pub fn baremount(
}) })
} }
#[instrument] #[derive(Debug)]
async fn ephemeral_storage_handler( struct EphemeralHandler {}
logger: &Logger,
storage: &Storage,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
return handle_hugetlbfs_storage(logger, storage).await;
}
// normal ephemeral storage #[async_trait::async_trait]
fs::create_dir_all(&storage.mount_point)?; impl StorageHandler for EphemeralHandler {
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
info!(ctx.logger, "handle hugetlbfs storage");
// Allocate hugepages before mount
// /sys/kernel/mm/hugepages/hugepages-1048576kB/nr_hugepages
// /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages
// options eg "pagesize=2097152,size=524288000"(2M, 500M)
allocate_hugepages(ctx.logger, &storage.options.to_vec())
.context("allocate hugepages")?;
common_storage_handler(ctx.logger, &storage)?;
} else if !storage.options.is_empty() {
// By now we only support one option field: "fsGroup" which
// isn't an valid mount option, thus we should remove it when
// do mount.
let opts = parse_options(&storage.options);
storage.options = Default::default();
common_storage_handler(ctx.logger, &storage)?;
if !storage.options.is_empty() { // ephemeral_storage didn't support mount options except fsGroup.
// By now we only support one option field: "fsGroup" which if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
// isn't an valid mount option, thus we should remove it when let gid = fsgid.parse::<u32>()?;
// do mount.
let mut new_storage = storage.clone();
new_storage.options = Default::default();
common_storage_handler(logger, &new_storage)?;
let opts = parse_options(&storage.options); nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
// ephemeral_storage didn't support mount options except fsGroup. let meta = fs::metadata(&storage.mount_point)?;
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) { let mut permission = meta.permissions();
let gid = fsgid.parse::<u32>()?;
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?; let o_mode = meta.mode() | MODE_SETGID;
permission.set_mode(o_mode);
let meta = fs::metadata(&storage.mount_point)?; fs::set_permissions(&storage.mount_point, permission)?;
let mut permission = meta.permissions(); }
} else {
let o_mode = meta.mode() | MODE_SETGID; common_storage_handler(ctx.logger, &storage)?;
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?;
} }
} else {
common_storage_handler(logger, storage)?;
}
Ok("".to_string()) new_device("".to_string())
}
} }
// update_ephemeral_mounts takes a list of ephemeral mounts and remounts them // update_ephemeral_mounts takes a list of ephemeral mounts and remounts them
@ -266,101 +315,105 @@ pub async fn update_ephemeral_mounts(
Ok(()) Ok(())
} }
#[instrument] #[derive(Debug)]
async fn overlayfs_storage_handler( struct OverlayfsHandler {}
logger: &Logger,
storage: &Storage,
cid: Option<&str>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
if storage
.options
.iter()
.any(|e| e == "io.katacontainers.fs-opt.overlay-rw")
{
let cid = cid.ok_or_else(|| anyhow!("No container id in rw overlay"))?;
let cpath = Path::new(crate::rpc::CONTAINER_BASE).join(cid);
let work = cpath.join("work");
let upper = cpath.join("upper");
fs::create_dir_all(&work).context("Creating overlay work directory")?; #[async_trait::async_trait]
fs::create_dir_all(&upper).context("Creating overlay upper directory")?; impl StorageHandler for OverlayfsHandler {
#[instrument]
let mut storage = storage.clone(); async fn create_device(
storage.fstype = "overlay".into(); &self,
storage mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
if storage
.options .options
.push(format!("upperdir={}", upper.to_string_lossy())); .iter()
storage .any(|e| e == "io.katacontainers.fs-opt.overlay-rw")
.options {
.push(format!("workdir={}", work.to_string_lossy())); let cid = ctx
return common_storage_handler(logger, &storage); .cid
} .clone()
.ok_or_else(|| anyhow!("No container id in rw overlay"))?;
let cpath = Path::new(crate::rpc::CONTAINER_BASE).join(cid);
let work = cpath.join("work");
let upper = cpath.join("upper");
common_storage_handler(logger, storage) fs::create_dir_all(&work).context("Creating overlay work directory")?;
} fs::create_dir_all(&upper).context("Creating overlay upper directory")?;
#[instrument] storage.fstype = "overlay".into();
async fn local_storage_handler( storage
_logger: &Logger, .options
storage: &Storage, .push(format!("upperdir={}", upper.to_string_lossy()));
_sandbox: &Arc<Mutex<Sandbox>>, storage
) -> Result<String> { .options
fs::create_dir_all(&storage.mount_point).context(format!( .push(format!("workdir={}", work.to_string_lossy()));
"failed to create dir all {:?}",
&storage.mount_point
))?;
let opts = parse_options(&storage.options);
let mut need_set_fsgid = false;
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
let gid = fsgid.parse::<u32>()?;
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
need_set_fsgid = true;
}
if let Some(mode) = opts.get("mode") {
let mut permission = fs::metadata(&storage.mount_point)?.permissions();
let mut o_mode = u32::from_str_radix(mode, 8)?;
if need_set_fsgid {
// set SetGid mode mask.
o_mode |= MODE_SETGID;
} }
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?; let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
} }
Ok("".to_string())
} }
#[instrument] #[derive(Debug)]
async fn virtio9p_storage_handler( struct LocalHandler {}
logger: &Logger,
storage: &Storage, #[async_trait::async_trait]
_sandbox: &Arc<Mutex<Sandbox>>, impl StorageHandler for LocalHandler {
) -> Result<String> { #[instrument]
common_storage_handler(logger, storage) async fn create_device(
&self,
storage: Storage,
_ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}",
&storage.mount_point
))?;
let opts = parse_options(&storage.options);
let mut need_set_fsgid = false;
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
let gid = fsgid.parse::<u32>()?;
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
need_set_fsgid = true;
}
if let Some(mode) = opts.get("mode") {
let mut permission = fs::metadata(&storage.mount_point)?.permissions();
let mut o_mode = u32::from_str_radix(mode, 8)?;
if need_set_fsgid {
// set SetGid mode mask.
o_mode |= MODE_SETGID;
}
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?;
}
new_device("".to_string())
}
} }
#[instrument] #[derive(Debug)]
async fn handle_hugetlbfs_storage(logger: &Logger, storage: &Storage) -> Result<String> { struct Virtio9pHandler {}
info!(logger, "handle hugetlbfs storage");
// Allocate hugepages before mount
// /sys/kernel/mm/hugepages/hugepages-1048576kB/nr_hugepages
// /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages
// options eg "pagesize=2097152,size=524288000"(2M, 500M)
allocate_hugepages(logger, &storage.options).context("allocate hugepages")?;
common_storage_handler(logger, storage)?; #[async_trait::async_trait]
impl StorageHandler for Virtio9pHandler {
// hugetlbfs return empty string as ephemeral_storage_handler do. #[instrument]
// this is a sandbox level storage, but not a container-level mount. async fn create_device(
Ok("".to_string()) &self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
} }
// Allocate hugepages by writing to sysfs // Allocate hugepages by writing to sysfs
@ -451,98 +504,169 @@ fn get_pagesize_and_size_from_option(options: &[String]) -> Result<(u64, u64)> {
Ok((pagesize, size)) Ok((pagesize, size))
} }
// virtiommio_blk_storage_handler handles the storage for mmio blk driver. #[derive(Debug)]
#[instrument] struct VirtioFsHandler {}
async fn virtiommio_blk_storage_handler(
logger: &Logger, #[async_trait::async_trait]
storage: &Storage, impl StorageHandler for VirtioFsHandler {
sandbox: &Arc<Mutex<Sandbox>>, #[instrument]
) -> Result<String> { async fn create_device(
let storage = storage.clone(); &self,
if !Path::new(&storage.source).exists() { storage: Storage,
get_virtio_mmio_device_name(sandbox, &storage.source) ctx: &mut StorageContext,
.await ) -> Result<StorageDeviceObject> {
.context("failed to get mmio device name")?; let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
} }
//The source path is VmPath
common_storage_handler(logger, &storage)
} }
// virtiofs_storage_handler handles the storage for virtio-fs. #[derive(Debug)]
#[instrument] struct VirtioBlkMmioHandler {}
async fn virtiofs_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
// virtio_blk_storage_handler handles the storage for blk driver. #[async_trait::async_trait]
#[instrument] impl StorageHandler for VirtioBlkMmioHandler {
async fn virtio_blk_storage_handler( #[instrument]
logger: &Logger, async fn create_device(
storage: &Storage, &self,
sandbox: &Arc<Mutex<Sandbox>>, storage: Storage,
) -> Result<String> { ctx: &mut StorageContext,
let mut storage = storage.clone(); ) -> Result<StorageDeviceObject> {
// If hot-plugged, get the device node path based on the PCI path if !Path::new(&storage.source).exists() {
// otherwise use the virt path provided in Storage Source get_virtio_mmio_device_name(ctx.sandbox, &storage.source)
if storage.source.starts_with("/dev") { .await
let metadata = fs::metadata(&storage.source) .context("failed to get mmio device name")?;
.context(format!("get metadata on file {:?}", &storage.source))?;
let mode = metadata.permissions().mode();
if mode & libc::S_IFBLK == 0 {
return Err(anyhow!("Invalid device {}", &storage.source));
} }
} else { let path = common_storage_handler(ctx.logger, &storage)?;
let pcipath = pci::Path::from_str(&storage.source)?; new_device(path)
let dev_path = get_virtio_blk_pci_device_name(sandbox, &pcipath).await?; }
}
#[derive(Debug)]
struct VirtioBlkPciHandler {}
#[async_trait::async_trait]
impl StorageHandler for VirtioBlkPciHandler {
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// If hot-plugged, get the device node path based on the PCI path
// otherwise use the virt path provided in Storage Source
if storage.source.starts_with("/dev") {
let metadata = fs::metadata(&storage.source)
.context(format!("get metadata on file {:?}", &storage.source))?;
let mode = metadata.permissions().mode();
if mode & libc::S_IFBLK == 0 {
return Err(anyhow!("Invalid device {}", &storage.source));
}
} else {
let pcipath = pci::Path::from_str(&storage.source)?;
let dev_path = get_virtio_blk_pci_device_name(ctx.sandbox, &pcipath).await?;
storage.source = dev_path;
}
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
}
#[derive(Debug)]
struct VirtioBlkCcwHandler {}
#[async_trait::async_trait]
impl StorageHandler for VirtioBlkCcwHandler {
#[cfg(target_arch = "s390x")]
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(ctx.sandbox, &ccw_device).await?;
storage.source = dev_path; storage.source = dev_path;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
} }
common_storage_handler(logger, &storage) #[cfg(not(target_arch = "s390x"))]
#[instrument]
async fn create_device(
&self,
_storage: Storage,
_ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
Err(anyhow!("CCW is only supported on s390x"))
}
} }
// virtio_blk_ccw_storage_handler handles storage for the blk-ccw driver (s390x) #[derive(Debug)]
#[cfg(target_arch = "s390x")] struct ScsiHandler {}
#[instrument]
async fn virtio_blk_ccw_storage_handler( #[async_trait::async_trait]
logger: &Logger, impl StorageHandler for ScsiHandler {
storage: &Storage, #[instrument]
sandbox: &Arc<Mutex<Sandbox>>, async fn create_device(
) -> Result<String> { &self,
let mut storage = storage.clone(); mut storage: Storage,
let ccw_device = ccw::Device::from_str(&storage.source)?; ctx: &mut StorageContext,
let dev_path = get_virtio_blk_ccw_device_name(sandbox, &ccw_device).await?; ) -> Result<StorageDeviceObject> {
storage.source = dev_path; // Retrieve the device path from SCSI address.
common_storage_handler(logger, &storage) let dev_path = get_scsi_device_name(ctx.sandbox, &storage.source).await?;
storage.source = dev_path;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
} }
#[cfg(not(target_arch = "s390x"))] #[derive(Debug)]
#[instrument] struct PmemHandler {}
async fn virtio_blk_ccw_storage_handler(
_: &Logger, #[async_trait::async_trait]
_: &Storage, impl StorageHandler for PmemHandler {
_: &Arc<Mutex<Sandbox>>, #[instrument]
) -> Result<String> { async fn create_device(
Err(anyhow!("CCW is only supported on s390x")) &self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// Retrieve the device path from NVDIMM address.
wait_for_pmem_device(ctx.sandbox, &storage.source).await?;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
} }
// virtio_scsi_storage_handler handles the storage for scsi driver. #[derive(Debug)]
#[instrument] struct BindWatcherHandler {}
async fn virtio_scsi_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
// Retrieve the device path from SCSI address. #[async_trait::async_trait]
let dev_path = get_scsi_device_name(sandbox, &storage.source).await?; impl StorageHandler for BindWatcherHandler {
storage.source = dev_path; #[instrument]
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
if let Some(cid) = ctx.cid {
ctx.sandbox
.lock()
.await
.bind_watcher
.add_container(cid.to_string(), iter::once(storage.clone()), ctx.logger)
.await?;
}
new_device("".to_string())
}
}
common_storage_handler(logger, &storage) fn new_device(path: String) -> Result<StorageDeviceObject> {
let device = StorageDeviceGeneric::new(path);
Ok(Arc::new(Mutex::new(device)))
} }
#[instrument] #[instrument]
@ -552,39 +676,6 @@ fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result<String>
Ok(storage.mount_point.clone()) Ok(storage.mount_point.clone())
} }
// nvdimm_storage_handler handles the storage for NVDIMM driver.
#[instrument]
async fn nvdimm_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
// Retrieve the device path from NVDIMM address.
wait_for_pmem_device(sandbox, &storage.source).await?;
common_storage_handler(logger, &storage)
}
async fn bind_watcher_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<()> {
let mut locked = sandbox.lock().await;
if let Some(cid) = cid {
locked
.bind_watcher
.add_container(cid, iter::once(storage.clone()), logger)
.await
} else {
Ok(())
}
}
// mount_storage performs the mount described by the storage structure. // mount_storage performs the mount described by the storage structure.
#[instrument] #[instrument]
fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> { fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
@ -728,68 +819,67 @@ pub fn is_mounted(mount_point: &str) -> Result<bool> {
#[instrument] #[instrument]
pub async fn add_storages( pub async fn add_storages(
logger: Logger, logger: Logger,
storages: &[Storage], storages: Vec<Storage>,
sandbox: &Arc<Mutex<Sandbox>>, sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>, cid: Option<String>,
) -> Result<Vec<String>> { ) -> Result<Vec<String>> {
let mut mount_list = Vec::new(); let mut mount_list = Vec::new();
for storage in storages { for storage in storages {
let handler_name = &storage.driver; let path = storage.mount_point.clone();
let logger = let state = sandbox.lock().await.add_sandbox_storage(&path).await;
logger.new(o!( "subsystem" => "storage", "storage-type" => handler_name.to_string())); if state.ref_count().await > 1 {
// The device already exists.
{ continue;
let mut sb = sandbox.lock().await;
let state = sb.add_sandbox_storage(&storage.mount_point).await;
if state.ref_count().await > 1 {
continue;
}
} }
let res = match handler_name.as_str() { if let Some(handler) = STORAGE_HANDLERS.handler(&storage.driver) {
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, storage, sandbox).await, let logger =
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, storage, sandbox).await, logger.new(o!( "subsystem" => "storage", "storage-type" => storage.driver.clone()));
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, storage, sandbox).await, let mut ctx = StorageContext {
DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, storage, sandbox).await, cid: &cid,
DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, storage, sandbox).await, logger: &logger,
DRIVER_OVERLAYFS_TYPE => { sandbox,
overlayfs_storage_handler(&logger, storage, cid.as_deref(), sandbox).await };
}
DRIVER_MMIO_BLK_TYPE => virtiommio_blk_storage_handler(&logger, storage, sandbox).await,
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, storage, sandbox).await,
DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, storage, sandbox).await,
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, storage, sandbox).await,
DRIVER_WATCHABLE_BIND_TYPE => {
bind_watcher_storage_handler(&logger, storage, sandbox, cid.clone()).await?;
// Don't register watch mounts, they're handled separately by the watcher.
Ok(String::new())
}
_ => {
return Err(anyhow!(
"Failed to find the storage handler {}",
storage.driver.to_owned()
));
}
};
let mount_point = match res { match handler.create_device(storage, &mut ctx).await {
Err(e) => { Ok(device) => {
error!( match sandbox
logger, .lock()
"add_storages failed, storage: {:?}, error: {:?} ", storage, e .await
); .update_sandbox_storage(&path, device.clone())
let mut sb = sandbox.lock().await; {
if let Err(e) = sb.remove_sandbox_storage(&storage.mount_point).await { Ok(d) => {
warn!(logger, "fail to unset sandbox storage {:?}", e); let path = device.lock().await.path().to_string();
if !path.is_empty() {
mount_list.push(path.clone());
}
drop(d);
}
Err(device) => {
error!(logger, "failed to update device for storage");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await
{
warn!(logger, "failed to remove dummy sandbox storage {:?}", e);
}
device.lock().await.cleanup();
return Err(anyhow!("failed to update device for storage"));
}
}
}
Err(e) => {
error!(logger, "failed to create device for storage, error: {e:?}");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await {
warn!(logger, "failed to remove dummy sandbox storage {e:?}");
}
return Err(e);
} }
return Err(e);
} }
Ok(m) => m, } else {
}; return Err(anyhow!(
"Failed to find the storage handler {}",
if !mount_point.is_empty() { storage.driver
mount_list.push(mount_point); ));
} }
} }

View File

@ -243,13 +243,7 @@ impl AgentService {
// After all those storages have been processed, no matter the order // After all those storages have been processed, no matter the order
// here, the agent will rely on rustjail (using the oci.Mounts // here, the agent will rely on rustjail (using the oci.Mounts
// list) to bind mount all of them inside the container. // list) to bind mount all of them inside the container.
let m = add_storages( let m = add_storages(sl(), req.storages, &self.sandbox, Some(req.container_id)).await?;
sl(),
&req.storages,
&self.sandbox,
Some(req.container_id.clone()),
)
.await?;
let mut s = self.sandbox.lock().await; let mut s = self.sandbox.lock().await;
s.container_mounts.insert(cid.clone(), m); s.container_mounts.insert(cid.clone(), m);
@ -1196,7 +1190,7 @@ impl agent_ttrpc::AgentService for AgentService {
s.setup_shared_namespaces().await.map_ttrpc_err(same)?; s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
} }
let m = add_storages(sl(), &req.storages, &self.sandbox, None) let m = add_storages(sl(), req.storages, &self.sandbox, None)
.await .await
.map_ttrpc_err(same)?; .map_ttrpc_err(same)?;
self.sandbox.lock().await.mounts = m; self.sandbox.lock().await.mounts = m;