runtime-rs: remove device

Support remove device after container stop

Fixes:#5375
Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com>
Signed-off-by: alex.lyn <alex.lyn@antgroup.com>
This commit is contained in:
Zhongtao Hu 2023-05-11 11:52:17 +08:00
parent f16012a1eb
commit 6800d30fdb
16 changed files with 110 additions and 39 deletions

View File

@ -6,7 +6,7 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::{anyhow, Context, Ok, Result};
use anyhow::{anyhow, Context, Result};
use kata_sys_util::rand::RandomBytes;
use tokio::sync::Mutex;
@ -19,7 +19,7 @@ use super::{
util::{get_host_path, get_virt_drive_name},
Device,
};
pub type ArcMutexBoxDevice = Arc<Mutex<dyn Device>>;
pub type ArcMutexDevice = Arc<Mutex<dyn Device>>;
/// block_index and released_block_index are used to search an available block index
/// in Sandbox.
@ -60,14 +60,14 @@ impl SharedInfo {
// Device manager will manage the lifecycle of sandbox device
pub struct DeviceManager {
devices: HashMap<String, ArcMutexBoxDevice>,
devices: HashMap<String, ArcMutexDevice>,
hypervisor: Arc<dyn Hypervisor>,
shared_info: SharedInfo,
}
impl DeviceManager {
pub fn new(hypervisor: Arc<dyn Hypervisor>) -> Result<Self> {
let devices = HashMap::<String, ArcMutexBoxDevice>::new();
let devices = HashMap::<String, ArcMutexDevice>::new();
Ok(DeviceManager {
devices,
hypervisor,
@ -87,23 +87,52 @@ impl DeviceManager {
}
pub async fn try_add_device(&mut self, device_id: &str) -> Result<()> {
// find the device
let device = self
.devices
.get(device_id)
.context("failed to find device")?;
let mut device_guard = device.lock().await;
// attach device
let result = device.lock().await.attach(self.hypervisor.as_ref()).await;
let result = device_guard.attach(self.hypervisor.as_ref()).await;
// handle attach error
if let Err(e) = result {
if let DeviceConfig::Block(config) = device.lock().await.get_device_info().await {
if let DeviceConfig::Block(config) = device_guard.get_device_info().await {
self.shared_info.release_device_index(config.index);
};
drop(device_guard);
self.devices.remove(device_id);
return Err(e);
}
Ok(())
}
pub async fn try_remove_device(&mut self, device_id: &str) -> Result<()> {
if let Some(dev) = self.devices.get(device_id) {
let mut device_guard = dev.lock().await;
let result = match device_guard.detach(self.hypervisor.as_ref()).await {
Ok(index) => {
if let Some(i) = index {
// release the declared block device index
self.shared_info.release_device_index(i);
}
Ok(())
}
Err(e) => Err(e),
};
if result.is_ok() {
drop(device_guard);
// if detach success, remove it from device manager
self.devices.remove(device_id);
}
return result;
}
Err(anyhow!(
"device with specified ID hasn't been created. {}",
device_id
))
}
pub async fn get_device_info(&self, device_id: &str) -> Result<DeviceConfig> {
if let Some(dev) = self.devices.get(device_id) {
return Ok(dev.lock().await.get_device_info().await);
@ -140,7 +169,7 @@ impl DeviceManager {
// device ID must be generated by manager instead of device itself
// in case of ID collision
let device_id = self.new_device_id()?;
let dev: ArcMutexBoxDevice = match device_config {
let dev: ArcMutexDevice = match device_config {
DeviceConfig::Block(config) => self
.create_block_device(config, device_id.clone())
.await
@ -158,7 +187,7 @@ impl DeviceManager {
&mut self,
config: &BlockConfig,
device_id: String,
) -> Result<ArcMutexBoxDevice> {
) -> Result<ArcMutexDevice> {
let mut block_config = config.clone();
block_config.id = device_id.clone();
// get hypervisor block driver
@ -181,8 +210,8 @@ impl DeviceManager {
block_config.index = current_index;
let drive_name = get_virt_drive_name(current_index as i32)?;
block_config.virt_path = format!("/dev/{}", drive_name);
// if the path on host is empty, we need to get device host path from major and minor
// Otherwise, it might be rawfile based block device
// if the path on host is empty, we need to get device host path from the device major and minor number
// Otherwise, it might be rawfile based block device, the host path is already passed from the runtime, so we don't need to do anything here
if block_config.path_on_host.is_empty() {
block_config.path_on_host = get_host_path("b".to_owned(), config.major, config.minor)
.context("failed to get host path")?;

View File

@ -74,6 +74,7 @@ impl Device for BlockConfig {
}
async fn detach(&mut self, h: &dyn hypervisor) -> Result<Option<u64>> {
// get the count of device detached, skip detach once it reaches the 0
if self
.decrease_attach_count()
.await

View File

@ -11,6 +11,7 @@ use agent::types::Device;
use agent::{Agent, Storage};
use anyhow::Result;
use async_trait::async_trait;
use hypervisor::device::device_manager::DeviceManager;
use hypervisor::Hypervisor;
use kata_types::config::TomlConfig;
use kata_types::mount::Mount;
@ -52,6 +53,11 @@ impl ResourceManager {
inner.config()
}
pub async fn get_device_manager(&self) -> Arc<RwLock<DeviceManager>> {
let inner = self.inner.read().await;
inner.get_device_manager()
}
pub async fn prepare_before_start_vm(&self, device_configs: Vec<ResourceConfig>) -> Result<()> {
let mut inner = self.inner.write().await;
inner.prepare_before_start_vm(device_configs).await

View File

@ -72,6 +72,10 @@ impl ResourceManagerInner {
self.toml_config.clone()
}
pub fn get_device_manager(&self) -> Arc<RwLock<DeviceManager>> {
self.device_manager.clone()
}
pub async fn prepare_before_start_vm(
&mut self,
device_configs: Vec<ResourceConfig>,

View File

@ -104,8 +104,13 @@ impl Rootfs for BlockRootfs {
async fn get_device_id(&self) -> Result<Option<String>> {
Ok(Some(self.device_id.clone()))
}
async fn cleanup(&self) -> Result<()> {
Ok(())
async fn cleanup(&self, device_manager: &RwLock<DeviceManager>) -> Result<()> {
device_manager
.write()
.await
.try_remove_device(&self.device_id)
.await
}
}

View File

@ -27,7 +27,7 @@ pub trait Rootfs: Send + Sync {
async fn get_guest_rootfs_path(&self) -> Result<String>;
async fn get_rootfs_mount(&self) -> Result<Vec<oci::Mount>>;
async fn get_storage(&self) -> Option<Storage>;
async fn cleanup(&self) -> Result<()>;
async fn cleanup(&self, device_manager: &RwLock<DeviceManager>) -> Result<()>;
async fn get_device_id(&self) -> Result<Option<String>>;
}

View File

@ -16,8 +16,9 @@ use crate::{
use agent::Storage;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use hypervisor::Hypervisor;
use hypervisor::{device::device_manager::DeviceManager, Hypervisor};
use kata_types::mount::{Mount, NydusExtraOptions};
use tokio::sync::RwLock;
// Used for nydus rootfs
pub(crate) const NYDUS_ROOTFS_TYPE: &str = "fuse.nydus-overlayfs";
@ -154,7 +155,7 @@ impl Rootfs for NydusRootfs {
Ok(None)
}
async fn cleanup(&self) -> Result<()> {
async fn cleanup(&self, _device_manager: &RwLock<DeviceManager>) -> Result<()> {
// TODO: Clean up NydusRootfs after the container is killed
warn!(sl!(), "Cleaning up NydusRootfs is still unimplemented.");
Ok(())

View File

@ -9,8 +9,10 @@ use std::sync::Arc;
use agent::Storage;
use anyhow::{Context, Result};
use async_trait::async_trait;
use hypervisor::device::device_manager::DeviceManager;
use kata_sys_util::mount::{umount_timeout, Mounter};
use kata_types::mount::Mount;
use tokio::sync::RwLock;
use super::{Rootfs, ROOTFS};
use crate::share_fs::{ShareFs, ShareFsRootfsConfig};
@ -79,7 +81,7 @@ impl Rootfs for ShareFsRootfs {
Ok(None)
}
async fn cleanup(&self) -> Result<()> {
async fn cleanup(&self, _device_manager: &RwLock<DeviceManager>) -> Result<()> {
// Umount the mount point shared to guest
let share_fs_mount = self.share_fs.get_share_fs_mount();
share_fs_mount

View File

@ -13,7 +13,7 @@ use crate::share_fs::{do_get_guest_path, do_get_host_path};
use super::{share_fs_volume::generate_mount_path, Volume};
use agent::Storage;
use anyhow::{anyhow, Context};
use hypervisor::{device::DeviceManager, BlockConfig, DeviceConfig};
use hypervisor::{device::device_manager::DeviceManager, BlockConfig, DeviceConfig};
use nix::sys::stat::{self, SFlag};
use tokio::sync::RwLock;
#[derive(Debug)]
@ -128,10 +128,12 @@ impl Volume for BlockVolume {
Ok(s)
}
async fn cleanup(&self) -> Result<()> {
// TODO: Clean up BlockVolume
warn!(sl!(), "Cleaning up BlockVolume is still unimplemented.");
Ok(())
async fn cleanup(&self, device_manager: &RwLock<DeviceManager>) -> Result<()> {
device_manager
.write()
.await
.try_remove_device(&self.device_id)
.await
}
fn get_device_id(&self) -> Result<Option<String>> {

View File

@ -4,6 +4,9 @@
// SPDX-License-Identifier: Apache-2.0
//
use hypervisor::device::device_manager::DeviceManager;
use tokio::sync::RwLock;
use anyhow::Result;
use async_trait::async_trait;
@ -33,7 +36,7 @@ impl Volume for DefaultVolume {
Ok(vec![])
}
async fn cleanup(&self) -> Result<()> {
async fn cleanup(&self, _device_manager: &RwLock<DeviceManager>) -> Result<()> {
// TODO: Clean up DefaultVolume
warn!(sl!(), "Cleaning up DefaultVolume is still unimplemented.");
Ok(())

View File

@ -15,9 +15,10 @@ use agent::Storage;
use anyhow::{anyhow, Context, Ok, Result};
use async_trait::async_trait;
use byte_unit::Byte;
use hypervisor::HUGETLBFS;
use hypervisor::{device::device_manager::DeviceManager, HUGETLBFS};
use kata_sys_util::{fs::get_base_name, mount::PROC_MOUNTS_FILE};
use kata_types::mount::KATA_EPHEMERAL_VOLUME_TYPE;
use tokio::sync::RwLock;
use super::{Volume, BIND};
@ -88,7 +89,7 @@ impl Volume for Hugepage {
Ok(s)
}
async fn cleanup(&self) -> Result<()> {
async fn cleanup(&self, _device_manager: &RwLock<DeviceManager>) -> Result<()> {
Ok(())
}

View File

@ -12,7 +12,7 @@ mod shm_volume;
use async_trait::async_trait;
use anyhow::{Context, Result};
use hypervisor::device::DeviceManager;
use hypervisor::device::device_manager::DeviceManager;
use std::{sync::Arc, vec::Vec};
use tokio::sync::RwLock;
@ -27,7 +27,7 @@ pub trait Volume: Send + Sync {
fn get_volume_mount(&self) -> Result<Vec<oci::Mount>>;
fn get_storage(&self) -> Result<Vec<agent::Storage>>;
fn get_device_id(&self) -> Result<Option<String>>;
async fn cleanup(&self) -> Result<()>;
async fn cleanup(&self, device_manager: &RwLock<DeviceManager>) -> Result<()>;
}
#[derive(Default)]

View File

@ -12,6 +12,8 @@ use std::{
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use hypervisor::device::device_manager::DeviceManager;
use tokio::sync::RwLock;
use super::Volume;
use crate::share_fs::{MountedInfo, ShareFs, ShareFsVolumeConfig};
@ -158,7 +160,7 @@ impl Volume for ShareFsVolume {
Ok(self.storages.clone())
}
async fn cleanup(&self) -> Result<()> {
async fn cleanup(&self, _device_manager: &RwLock<DeviceManager>) -> Result<()> {
let share_fs = match self.share_fs.as_ref() {
Some(fs) => fs,
None => return Ok(()),

View File

@ -8,6 +8,8 @@ use std::path::Path;
use anyhow::Result;
use async_trait::async_trait;
use hypervisor::device::device_manager::DeviceManager;
use tokio::sync::RwLock;
use super::Volume;
use crate::share_fs::DEFAULT_KATA_GUEST_SANDBOX_DIR;
@ -99,7 +101,7 @@ impl Volume for ShmVolume {
Ok(s)
}
async fn cleanup(&self) -> Result<()> {
async fn cleanup(&self, _device_manager: &RwLock<DeviceManager>) -> Result<()> {
// TODO: Clean up ShmVolume
warn!(sl!(), "Cleaning up ShmVolume is still unimplemented.");
Ok(())

View File

@ -183,7 +183,8 @@ impl Container {
match process.process_type {
ProcessType::Container => {
if let Err(err) = inner.start_container(&process.container_id).await {
let _ = inner.stop_process(process, true).await;
let device_manager = self.resource_manager.get_device_manager().await;
let _ = inner.stop_process(process, true, &device_manager).await;
return Err(err);
}
@ -195,7 +196,8 @@ impl Container {
}
ProcessType::Exec => {
if let Err(e) = inner.start_exec_process(process).await {
let _ = inner.stop_process(process, true).await;
let device_manager = self.resource_manager.get_device_manager().await;
let _ = inner.stop_process(process, true, &device_manager).await;
return Err(e).context("enter process");
}
@ -277,7 +279,10 @@ impl Container {
all: bool,
) -> Result<()> {
let mut inner = self.inner.write().await;
inner.signal_process(container_process, signal, all).await
let device_manager = self.resource_manager.get_device_manager().await;
inner
.signal_process(container_process, signal, all, &device_manager)
.await
}
pub async fn exec_process(
@ -314,8 +319,9 @@ impl Container {
pub async fn stop_process(&self, container_process: &ContainerProcess) -> Result<()> {
let mut inner = self.inner.write().await;
let device_manager = self.resource_manager.get_device_manager().await;
inner
.stop_process(container_process, true)
.stop_process(container_process, true, &device_manager)
.await
.context("stop process")
}

View File

@ -12,6 +12,7 @@ use common::{
error::Error,
types::{ContainerID, ContainerProcess, ProcessExitStatus, ProcessStatus, ProcessType},
};
use hypervisor::device::device_manager::DeviceManager;
use nix::sys::signal::Signal;
use resource::{rootfs::Rootfs, volume::Volume};
use tokio::sync::RwLock;
@ -193,6 +194,7 @@ impl ContainerInner {
&mut self,
process: &ContainerProcess,
force: bool,
device_manager: &RwLock<DeviceManager>,
) -> Result<()> {
let logger = logger_with_process(process);
info!(logger, "begin to stop process");
@ -212,7 +214,7 @@ impl ContainerInner {
// send kill signal to container
// ignore the error of sending signal, since the process would
// have been killed and exited yet.
self.signal_process(process, Signal::SIGKILL as u32, false)
self.signal_process(process, Signal::SIGKILL as u32, false, device_manager)
.await
.map_err(|e| {
warn!(logger, "failed to signal kill. {:?}", e);
@ -242,6 +244,7 @@ impl ContainerInner {
process: &ContainerProcess,
signal: u32,
all: bool,
device_manager: &RwLock<DeviceManager>,
) -> Result<()> {
let mut process_id: agent::ContainerProcessID = process.clone().into();
if all {
@ -253,8 +256,12 @@ impl ContainerInner {
.signal_process(agent::SignalProcessRequest { process_id, signal })
.await?;
self.clean_volumes().await.context("clean volumes")?;
self.clean_rootfs().await.context("clean rootfs")?;
self.clean_volumes(device_manager)
.await
.context("clean volumes")?;
self.clean_rootfs(device_manager)
.await
.context("clean rootfs")?;
Ok(())
}
@ -278,10 +285,10 @@ impl ContainerInner {
Ok(())
}
async fn clean_volumes(&mut self) -> Result<()> {
async fn clean_volumes(&mut self, device_manager: &RwLock<DeviceManager>) -> Result<()> {
let mut unhandled = Vec::new();
for v in self.volumes.iter() {
if let Err(err) = v.cleanup().await {
if let Err(err) = v.cleanup(device_manager).await {
unhandled.push(Arc::clone(v));
warn!(
sl!(),
@ -297,10 +304,10 @@ impl ContainerInner {
Ok(())
}
async fn clean_rootfs(&mut self) -> Result<()> {
async fn clean_rootfs(&mut self, device_manager: &RwLock<DeviceManager>) -> Result<()> {
let mut unhandled = Vec::new();
for rootfs in self.rootfs.iter() {
if let Err(err) = rootfs.cleanup().await {
if let Err(err) = rootfs.cleanup(device_manager).await {
unhandled.push(Arc::clone(rootfs));
warn!(
sl!(),