1
0
mirror of https://github.com/kata-containers/kata-containers.git synced 2025-04-29 12:14:48 +00:00

agent: switch to new storage subsystem

Switch to new storage subsystem to create a StorageDevice for each
storage object.

Fixes: 

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-14 20:26:58 +08:00
parent fcbda0b419
commit 60ca12ccb0
3 changed files with 389 additions and 305 deletions

View File

@ -35,9 +35,9 @@ const VM_ROOTFS: &str = "/";
const BLOCK: &str = "block";
pub const DRIVER_9P_TYPE: &str = "9p";
pub const DRIVER_VIRTIOFS_TYPE: &str = "virtio-fs";
pub const DRIVER_BLK_TYPE: &str = "blk";
pub const DRIVER_BLK_PCI_TYPE: &str = "blk";
pub const DRIVER_BLK_CCW_TYPE: &str = "blk-ccw";
pub const DRIVER_MMIO_BLK_TYPE: &str = "mmioblk";
pub const DRIVER_BLK_MMIO_TYPE: &str = "mmioblk";
pub const DRIVER_SCSI_TYPE: &str = "scsi";
pub const DRIVER_NVDIMM_TYPE: &str = "nvdimm";
pub const DRIVER_EPHEMERAL_TYPE: &str = "ephemeral";
@ -935,9 +935,9 @@ async fn add_device(device: &Device, sandbox: &Arc<Mutex<Sandbox>>) -> Result<Sp
}
match device.type_.as_str() {
DRIVER_BLK_TYPE => virtio_blk_device_handler(device, sandbox).await,
DRIVER_BLK_PCI_TYPE => virtio_blk_device_handler(device, sandbox).await,
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_device_handler(device, sandbox).await,
DRIVER_MMIO_BLK_TYPE => virtiommio_blk_device_handler(device, sandbox).await,
DRIVER_BLK_MMIO_TYPE => virtiommio_blk_device_handler(device, sandbox).await,
DRIVER_NVDIMM_TYPE => virtio_nvdimm_device_handler(device, sandbox).await,
DRIVER_SCSI_TYPE => virtio_scsi_device_handler(device, sandbox).await,
DRIVER_VFIO_PCI_GK_TYPE | DRIVER_VFIO_PCI_TYPE => {

View File

@ -3,6 +3,8 @@
// SPDX-License-Identifier: Apache-2.0
//
#![allow(dead_code)]
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs::{self, File, OpenOptions};
@ -16,7 +18,10 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use kata_sys_util::mount::{create_mount_destination, get_linux_mount_info, parse_mount_options};
use kata_types::mount::{KATA_MOUNT_OPTION_FS_GID, KATA_SHAREDFS_GUEST_PREMOUNT_TAG};
use kata_types::mount::{
StorageDeviceGeneric, StorageHandlerManager, KATA_MOUNT_OPTION_FS_GID,
KATA_SHAREDFS_GUEST_PREMOUNT_TAG,
};
use nix::mount::MsFlags;
use nix::unistd::{Gid, Uid};
use regex::Regex;
@ -26,15 +31,15 @@ use tracing::instrument;
use crate::device::{
get_scsi_device_name, get_virtio_blk_pci_device_name, get_virtio_mmio_device_name,
online_device, wait_for_pmem_device, DRIVER_9P_TYPE, DRIVER_BLK_CCW_TYPE, DRIVER_BLK_TYPE,
DRIVER_EPHEMERAL_TYPE, DRIVER_LOCAL_TYPE, DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE,
DRIVER_OVERLAYFS_TYPE, DRIVER_SCSI_TYPE, DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE,
FS_TYPE_HUGETLB,
online_device, wait_for_pmem_device, DRIVER_9P_TYPE, DRIVER_BLK_MMIO_TYPE, DRIVER_BLK_PCI_TYPE,
DRIVER_EPHEMERAL_TYPE, DRIVER_LOCAL_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_OVERLAYFS_TYPE,
DRIVER_SCSI_TYPE, DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE, FS_TYPE_HUGETLB,
};
use crate::linux_abi::*;
use crate::pci;
use crate::protocols::agent::Storage;
use crate::protocols::types::FSGroupChangePolicy;
use crate::sandbox::StorageDeviceObject;
use crate::Sandbox;
#[cfg(target_arch = "s390x")]
use crate::{ccw, device::get_virtio_blk_ccw_device_name};
@ -57,6 +62,24 @@ pub struct InitMount<'a> {
options: Vec<&'a str>,
}
#[derive(Debug)]
pub struct StorageContext<'a> {
cid: &'a Option<String>,
logger: &'a Logger,
sandbox: &'a Arc<Mutex<Sandbox>>,
}
/// Trait object to handle storage device.
#[async_trait::async_trait]
pub trait StorageHandler: Send + Sync {
/// Create a new storage device.
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject>;
}
#[rustfmt::skip]
lazy_static!{
static ref CGROUPS: HashMap<&'static str, &'static str> = {
@ -90,17 +113,37 @@ lazy_static! {
];
}
#[rustfmt::skip]
lazy_static! {
pub static ref STORAGE_HANDLERS: StorageHandlerManager<Arc<dyn StorageHandler>> = {
let mut manager: StorageHandlerManager<Arc<dyn StorageHandler>> = StorageHandlerManager::new();
manager.add_handler(DRIVER_9P_TYPE, Arc::new(Virtio9pHandler{})).unwrap();
#[cfg(target_arch = "s390x")]
manager.add_handler(crate::device::DRIVER_BLK_CCW_TYPE, Arc::new(VirtioBlkCcwHandler{})).unwrap();
manager.add_handler(DRIVER_BLK_MMIO_TYPE, Arc::new(VirtioBlkMmioHandler{})).unwrap();
manager.add_handler(DRIVER_BLK_PCI_TYPE, Arc::new(VirtioBlkPciHandler{})).unwrap();
manager.add_handler(DRIVER_EPHEMERAL_TYPE, Arc::new(EphemeralHandler{})).unwrap();
manager.add_handler(DRIVER_LOCAL_TYPE, Arc::new(LocalHandler{})).unwrap();
manager.add_handler(DRIVER_NVDIMM_TYPE, Arc::new(PmemHandler{})).unwrap();
manager.add_handler(DRIVER_OVERLAYFS_TYPE, Arc::new(OverlayfsHandler{})).unwrap();
manager.add_handler(DRIVER_SCSI_TYPE, Arc::new(ScsiHandler{})).unwrap();
manager.add_handler(DRIVER_VIRTIOFS_TYPE, Arc::new(VirtioFsHandler{})).unwrap();
manager.add_handler(DRIVER_WATCHABLE_BIND_TYPE, Arc::new(BindWatcherHandler{})).unwrap();
manager
};
}
pub const STORAGE_HANDLER_LIST: &[&str] = &[
DRIVER_BLK_TYPE,
DRIVER_9P_TYPE,
DRIVER_VIRTIOFS_TYPE,
DRIVER_EPHEMERAL_TYPE,
DRIVER_OVERLAYFS_TYPE,
DRIVER_MMIO_BLK_TYPE,
DRIVER_LOCAL_TYPE,
DRIVER_SCSI_TYPE,
DRIVER_NVDIMM_TYPE,
DRIVER_WATCHABLE_BIND_TYPE,
//DRIVER_BLK_TYPE,
//DRIVER_9P_TYPE,
//DRIVER_VIRTIOFS_TYPE,
//DRIVER_EPHEMERAL_TYPE,
//DRIVER_OVERLAYFS_TYPE,
//DRIVER_MMIO_BLK_TYPE,
//DRIVER_LOCAL_TYPE,
//DRIVER_SCSI_TYPE,
//DRIVER_NVDIMM_TYPE,
//DRIVER_WATCHABLE_BIND_TYPE,
];
#[instrument]
@ -161,48 +204,54 @@ pub fn baremount(
})
}
#[instrument]
async fn ephemeral_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
return handle_hugetlbfs_storage(logger, storage).await;
}
#[derive(Debug)]
struct EphemeralHandler {}
// normal ephemeral storage
fs::create_dir_all(&storage.mount_point)?;
#[async_trait::async_trait]
impl StorageHandler for EphemeralHandler {
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
info!(ctx.logger, "handle hugetlbfs storage");
// Allocate hugepages before mount
// /sys/kernel/mm/hugepages/hugepages-1048576kB/nr_hugepages
// /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages
// options eg "pagesize=2097152,size=524288000"(2M, 500M)
allocate_hugepages(ctx.logger, &storage.options.to_vec())
.context("allocate hugepages")?;
common_storage_handler(ctx.logger, &storage)?;
} else if !storage.options.is_empty() {
// By now we only support one option field: "fsGroup" which
// isn't an valid mount option, thus we should remove it when
// do mount.
let opts = parse_options(&storage.options);
storage.options = Default::default();
common_storage_handler(ctx.logger, &storage)?;
if !storage.options.is_empty() {
// By now we only support one option field: "fsGroup" which
// isn't an valid mount option, thus we should remove it when
// do mount.
let mut new_storage = storage.clone();
new_storage.options = Default::default();
common_storage_handler(logger, &new_storage)?;
// ephemeral_storage didn't support mount options except fsGroup.
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
let gid = fsgid.parse::<u32>()?;
let opts = parse_options(&storage.options);
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
// ephemeral_storage didn't support mount options except fsGroup.
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
let gid = fsgid.parse::<u32>()?;
let meta = fs::metadata(&storage.mount_point)?;
let mut permission = meta.permissions();
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
let meta = fs::metadata(&storage.mount_point)?;
let mut permission = meta.permissions();
let o_mode = meta.mode() | MODE_SETGID;
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?;
let o_mode = meta.mode() | MODE_SETGID;
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?;
}
} else {
common_storage_handler(ctx.logger, &storage)?;
}
} else {
common_storage_handler(logger, storage)?;
}
Ok("".to_string())
new_device("".to_string())
}
}
// update_ephemeral_mounts takes a list of ephemeral mounts and remounts them
@ -266,101 +315,105 @@ pub async fn update_ephemeral_mounts(
Ok(())
}
#[instrument]
async fn overlayfs_storage_handler(
logger: &Logger,
storage: &Storage,
cid: Option<&str>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
if storage
.options
.iter()
.any(|e| e == "io.katacontainers.fs-opt.overlay-rw")
{
let cid = cid.ok_or_else(|| anyhow!("No container id in rw overlay"))?;
let cpath = Path::new(crate::rpc::CONTAINER_BASE).join(cid);
let work = cpath.join("work");
let upper = cpath.join("upper");
#[derive(Debug)]
struct OverlayfsHandler {}
fs::create_dir_all(&work).context("Creating overlay work directory")?;
fs::create_dir_all(&upper).context("Creating overlay upper directory")?;
let mut storage = storage.clone();
storage.fstype = "overlay".into();
storage
#[async_trait::async_trait]
impl StorageHandler for OverlayfsHandler {
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
if storage
.options
.push(format!("upperdir={}", upper.to_string_lossy()));
storage
.options
.push(format!("workdir={}", work.to_string_lossy()));
return common_storage_handler(logger, &storage);
}
.iter()
.any(|e| e == "io.katacontainers.fs-opt.overlay-rw")
{
let cid = ctx
.cid
.clone()
.ok_or_else(|| anyhow!("No container id in rw overlay"))?;
let cpath = Path::new(crate::rpc::CONTAINER_BASE).join(cid);
let work = cpath.join("work");
let upper = cpath.join("upper");
common_storage_handler(logger, storage)
}
fs::create_dir_all(&work).context("Creating overlay work directory")?;
fs::create_dir_all(&upper).context("Creating overlay upper directory")?;
#[instrument]
async fn local_storage_handler(
_logger: &Logger,
storage: &Storage,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}",
&storage.mount_point
))?;
let opts = parse_options(&storage.options);
let mut need_set_fsgid = false;
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
let gid = fsgid.parse::<u32>()?;
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
need_set_fsgid = true;
}
if let Some(mode) = opts.get("mode") {
let mut permission = fs::metadata(&storage.mount_point)?.permissions();
let mut o_mode = u32::from_str_radix(mode, 8)?;
if need_set_fsgid {
// set SetGid mode mask.
o_mode |= MODE_SETGID;
storage.fstype = "overlay".into();
storage
.options
.push(format!("upperdir={}", upper.to_string_lossy()));
storage
.options
.push(format!("workdir={}", work.to_string_lossy()));
}
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
Ok("".to_string())
}
#[instrument]
async fn virtio9p_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
#[derive(Debug)]
struct LocalHandler {}
#[async_trait::async_trait]
impl StorageHandler for LocalHandler {
#[instrument]
async fn create_device(
&self,
storage: Storage,
_ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}",
&storage.mount_point
))?;
let opts = parse_options(&storage.options);
let mut need_set_fsgid = false;
if let Some(fsgid) = opts.get(KATA_MOUNT_OPTION_FS_GID) {
let gid = fsgid.parse::<u32>()?;
nix::unistd::chown(storage.mount_point.as_str(), None, Some(Gid::from_raw(gid)))?;
need_set_fsgid = true;
}
if let Some(mode) = opts.get("mode") {
let mut permission = fs::metadata(&storage.mount_point)?.permissions();
let mut o_mode = u32::from_str_radix(mode, 8)?;
if need_set_fsgid {
// set SetGid mode mask.
o_mode |= MODE_SETGID;
}
permission.set_mode(o_mode);
fs::set_permissions(&storage.mount_point, permission)?;
}
new_device("".to_string())
}
}
#[instrument]
async fn handle_hugetlbfs_storage(logger: &Logger, storage: &Storage) -> Result<String> {
info!(logger, "handle hugetlbfs storage");
// Allocate hugepages before mount
// /sys/kernel/mm/hugepages/hugepages-1048576kB/nr_hugepages
// /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages
// options eg "pagesize=2097152,size=524288000"(2M, 500M)
allocate_hugepages(logger, &storage.options).context("allocate hugepages")?;
#[derive(Debug)]
struct Virtio9pHandler {}
common_storage_handler(logger, storage)?;
// hugetlbfs return empty string as ephemeral_storage_handler do.
// this is a sandbox level storage, but not a container-level mount.
Ok("".to_string())
#[async_trait::async_trait]
impl StorageHandler for Virtio9pHandler {
#[instrument]
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
}
// Allocate hugepages by writing to sysfs
@ -451,98 +504,169 @@ fn get_pagesize_and_size_from_option(options: &[String]) -> Result<(u64, u64)> {
Ok((pagesize, size))
}
// virtiommio_blk_storage_handler handles the storage for mmio blk driver.
#[instrument]
async fn virtiommio_blk_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
if !Path::new(&storage.source).exists() {
get_virtio_mmio_device_name(sandbox, &storage.source)
.await
.context("failed to get mmio device name")?;
#[derive(Debug)]
struct VirtioFsHandler {}
#[async_trait::async_trait]
impl StorageHandler for VirtioFsHandler {
#[instrument]
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
//The source path is VmPath
common_storage_handler(logger, &storage)
}
// virtiofs_storage_handler handles the storage for virtio-fs.
#[instrument]
async fn virtiofs_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
#[derive(Debug)]
struct VirtioBlkMmioHandler {}
// virtio_blk_storage_handler handles the storage for blk driver.
#[instrument]
async fn virtio_blk_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
// If hot-plugged, get the device node path based on the PCI path
// otherwise use the virt path provided in Storage Source
if storage.source.starts_with("/dev") {
let metadata = fs::metadata(&storage.source)
.context(format!("get metadata on file {:?}", &storage.source))?;
let mode = metadata.permissions().mode();
if mode & libc::S_IFBLK == 0 {
return Err(anyhow!("Invalid device {}", &storage.source));
#[async_trait::async_trait]
impl StorageHandler for VirtioBlkMmioHandler {
#[instrument]
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
if !Path::new(&storage.source).exists() {
get_virtio_mmio_device_name(ctx.sandbox, &storage.source)
.await
.context("failed to get mmio device name")?;
}
} else {
let pcipath = pci::Path::from_str(&storage.source)?;
let dev_path = get_virtio_blk_pci_device_name(sandbox, &pcipath).await?;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
}
#[derive(Debug)]
struct VirtioBlkPciHandler {}
#[async_trait::async_trait]
impl StorageHandler for VirtioBlkPciHandler {
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// If hot-plugged, get the device node path based on the PCI path
// otherwise use the virt path provided in Storage Source
if storage.source.starts_with("/dev") {
let metadata = fs::metadata(&storage.source)
.context(format!("get metadata on file {:?}", &storage.source))?;
let mode = metadata.permissions().mode();
if mode & libc::S_IFBLK == 0 {
return Err(anyhow!("Invalid device {}", &storage.source));
}
} else {
let pcipath = pci::Path::from_str(&storage.source)?;
let dev_path = get_virtio_blk_pci_device_name(ctx.sandbox, &pcipath).await?;
storage.source = dev_path;
}
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
}
#[derive(Debug)]
struct VirtioBlkCcwHandler {}
#[async_trait::async_trait]
impl StorageHandler for VirtioBlkCcwHandler {
#[cfg(target_arch = "s390x")]
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(ctx.sandbox, &ccw_device).await?;
storage.source = dev_path;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
common_storage_handler(logger, &storage)
#[cfg(not(target_arch = "s390x"))]
#[instrument]
async fn create_device(
&self,
_storage: Storage,
_ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
Err(anyhow!("CCW is only supported on s390x"))
}
}
// virtio_blk_ccw_storage_handler handles storage for the blk-ccw driver (s390x)
#[cfg(target_arch = "s390x")]
#[instrument]
async fn virtio_blk_ccw_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(sandbox, &ccw_device).await?;
storage.source = dev_path;
common_storage_handler(logger, &storage)
#[derive(Debug)]
struct ScsiHandler {}
#[async_trait::async_trait]
impl StorageHandler for ScsiHandler {
#[instrument]
async fn create_device(
&self,
mut storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// Retrieve the device path from SCSI address.
let dev_path = get_scsi_device_name(ctx.sandbox, &storage.source).await?;
storage.source = dev_path;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
}
#[cfg(not(target_arch = "s390x"))]
#[instrument]
async fn virtio_blk_ccw_storage_handler(
_: &Logger,
_: &Storage,
_: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
Err(anyhow!("CCW is only supported on s390x"))
#[derive(Debug)]
struct PmemHandler {}
#[async_trait::async_trait]
impl StorageHandler for PmemHandler {
#[instrument]
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
// Retrieve the device path from NVDIMM address.
wait_for_pmem_device(ctx.sandbox, &storage.source).await?;
let path = common_storage_handler(ctx.logger, &storage)?;
new_device(path)
}
}
// virtio_scsi_storage_handler handles the storage for scsi driver.
#[instrument]
async fn virtio_scsi_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
#[derive(Debug)]
struct BindWatcherHandler {}
// Retrieve the device path from SCSI address.
let dev_path = get_scsi_device_name(sandbox, &storage.source).await?;
storage.source = dev_path;
#[async_trait::async_trait]
impl StorageHandler for BindWatcherHandler {
#[instrument]
async fn create_device(
&self,
storage: Storage,
ctx: &mut StorageContext,
) -> Result<StorageDeviceObject> {
if let Some(cid) = ctx.cid {
ctx.sandbox
.lock()
.await
.bind_watcher
.add_container(cid.to_string(), iter::once(storage.clone()), ctx.logger)
.await?;
}
new_device("".to_string())
}
}
common_storage_handler(logger, &storage)
fn new_device(path: String) -> Result<StorageDeviceObject> {
let device = StorageDeviceGeneric::new(path);
Ok(Arc::new(Mutex::new(device)))
}
#[instrument]
@ -552,39 +676,6 @@ fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result<String>
Ok(storage.mount_point.clone())
}
// nvdimm_storage_handler handles the storage for NVDIMM driver.
#[instrument]
async fn nvdimm_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
// Retrieve the device path from NVDIMM address.
wait_for_pmem_device(sandbox, &storage.source).await?;
common_storage_handler(logger, &storage)
}
async fn bind_watcher_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<()> {
let mut locked = sandbox.lock().await;
if let Some(cid) = cid {
locked
.bind_watcher
.add_container(cid, iter::once(storage.clone()), logger)
.await
} else {
Ok(())
}
}
// mount_storage performs the mount described by the storage structure.
#[instrument]
fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
@ -728,68 +819,67 @@ pub fn is_mounted(mount_point: &str) -> Result<bool> {
#[instrument]
pub async fn add_storages(
logger: Logger,
storages: &[Storage],
storages: Vec<Storage>,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<Vec<String>> {
let mut mount_list = Vec::new();
for storage in storages {
let handler_name = &storage.driver;
let logger =
logger.new(o!( "subsystem" => "storage", "storage-type" => handler_name.to_string()));
{
let mut sb = sandbox.lock().await;
let state = sb.add_sandbox_storage(&storage.mount_point).await;
if state.ref_count().await > 1 {
continue;
}
let path = storage.mount_point.clone();
let state = sandbox.lock().await.add_sandbox_storage(&path).await;
if state.ref_count().await > 1 {
// The device already exists.
continue;
}
let res = match handler_name.as_str() {
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, storage, sandbox).await,
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, storage, sandbox).await,
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, storage, sandbox).await,
DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, storage, sandbox).await,
DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, storage, sandbox).await,
DRIVER_OVERLAYFS_TYPE => {
overlayfs_storage_handler(&logger, storage, cid.as_deref(), sandbox).await
}
DRIVER_MMIO_BLK_TYPE => virtiommio_blk_storage_handler(&logger, storage, sandbox).await,
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, storage, sandbox).await,
DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, storage, sandbox).await,
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, storage, sandbox).await,
DRIVER_WATCHABLE_BIND_TYPE => {
bind_watcher_storage_handler(&logger, storage, sandbox, cid.clone()).await?;
// Don't register watch mounts, they're handled separately by the watcher.
Ok(String::new())
}
_ => {
return Err(anyhow!(
"Failed to find the storage handler {}",
storage.driver.to_owned()
));
}
};
if let Some(handler) = STORAGE_HANDLERS.handler(&storage.driver) {
let logger =
logger.new(o!( "subsystem" => "storage", "storage-type" => storage.driver.clone()));
let mut ctx = StorageContext {
cid: &cid,
logger: &logger,
sandbox,
};
let mount_point = match res {
Err(e) => {
error!(
logger,
"add_storages failed, storage: {:?}, error: {:?} ", storage, e
);
let mut sb = sandbox.lock().await;
if let Err(e) = sb.remove_sandbox_storage(&storage.mount_point).await {
warn!(logger, "fail to unset sandbox storage {:?}", e);
match handler.create_device(storage, &mut ctx).await {
Ok(device) => {
match sandbox
.lock()
.await
.update_sandbox_storage(&path, device.clone())
{
Ok(d) => {
let path = device.lock().await.path().to_string();
if !path.is_empty() {
mount_list.push(path.clone());
}
drop(d);
}
Err(device) => {
error!(logger, "failed to update device for storage");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await
{
warn!(logger, "failed to remove dummy sandbox storage {:?}", e);
}
device.lock().await.cleanup();
return Err(anyhow!("failed to update device for storage"));
}
}
}
Err(e) => {
error!(logger, "failed to create device for storage, error: {e:?}");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await {
warn!(logger, "failed to remove dummy sandbox storage {e:?}");
}
return Err(e);
}
return Err(e);
}
Ok(m) => m,
};
if !mount_point.is_empty() {
mount_list.push(mount_point);
} else {
return Err(anyhow!(
"Failed to find the storage handler {}",
storage.driver
));
}
}

View File

@ -243,13 +243,7 @@ impl AgentService {
// After all those storages have been processed, no matter the order
// here, the agent will rely on rustjail (using the oci.Mounts
// list) to bind mount all of them inside the container.
let m = add_storages(
sl(),
&req.storages,
&self.sandbox,
Some(req.container_id.clone()),
)
.await?;
let m = add_storages(sl(), req.storages, &self.sandbox, Some(req.container_id)).await?;
let mut s = self.sandbox.lock().await;
s.container_mounts.insert(cid.clone(), m);
@ -1196,7 +1190,7 @@ impl agent_ttrpc::AgentService for AgentService {
s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
}
let m = add_storages(sl(), &req.storages, &self.sandbox, None)
let m = add_storages(sl(), req.storages, &self.sandbox, None)
.await
.map_ttrpc_err(same)?;
self.sandbox.lock().await.mounts = m;