mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-01 11:56:29 +00:00
agent: simplify the way to manage storage object
Simplify the way to manage storage objects, and introduce StorageStateCommon structures for coming extensions. Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
parent
8392c71bf2
commit
b03b1f6134
@ -741,8 +741,8 @@ pub async fn add_storages(
|
||||
|
||||
{
|
||||
let mut sb = sandbox.lock().await;
|
||||
let new_storage = sb.set_sandbox_storage(&storage.mount_point);
|
||||
if !new_storage {
|
||||
let state = sb.add_sandbox_storage(&storage.mount_point).await;
|
||||
if state.ref_count().await > 1 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -780,7 +780,7 @@ pub async fn add_storages(
|
||||
"add_storages failed, storage: {:?}, error: {:?} ", storage, e
|
||||
);
|
||||
let mut sb = sandbox.lock().await;
|
||||
if let Err(e) = sb.unset_sandbox_storage(&storage.mount_point) {
|
||||
if let Err(e) = sb.remove_sandbox_storage(&storage.mount_point).await {
|
||||
warn!(logger, "fail to unset sandbox storage {:?}", e);
|
||||
}
|
||||
return Err(e);
|
||||
@ -1884,7 +1884,7 @@ mod tests {
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
let msg = format!("test[{}]: {:?}", i, d);
|
||||
|
||||
let result = parse_mount_options(&d.options_vec)?;
|
||||
let result = parse_mount_options(&d.options_vec).unwrap();
|
||||
|
||||
let msg = format!("{}: result: {:?}", msg, result);
|
||||
|
||||
|
@ -308,7 +308,7 @@ impl AgentService {
|
||||
if let Err(e) = ctr.destroy().await {
|
||||
error!(sl(), "failed to destroy container: {:?}", e);
|
||||
}
|
||||
if let Err(e) = remove_container_resources(&mut s, &cid) {
|
||||
if let Err(e) = remove_container_resources(&mut s, &cid).await {
|
||||
error!(sl(), "failed to remove container resources: {:?}", e);
|
||||
}
|
||||
return Err(err);
|
||||
@ -360,7 +360,7 @@ impl AgentService {
|
||||
.ok_or_else(|| anyhow!("Invalid container id"))?
|
||||
.destroy()
|
||||
.await?;
|
||||
remove_container_resources(&mut sandbox, &cid)?;
|
||||
remove_container_resources(&mut sandbox, &cid).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@ -382,7 +382,7 @@ impl AgentService {
|
||||
.await
|
||||
.map_err(|_| anyhow!(nix::Error::ETIME))???;
|
||||
|
||||
remove_container_resources(&mut *self.sandbox.lock().await, &cid)
|
||||
remove_container_resources(&mut *self.sandbox.lock().await, &cid).await
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
@ -1679,7 +1679,7 @@ fn update_container_namespaces(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
|
||||
async fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
|
||||
let mut cmounts: Vec<String> = vec![];
|
||||
|
||||
// Find the sandbox storage used by this container
|
||||
@ -1693,7 +1693,7 @@ fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
|
||||
}
|
||||
|
||||
for m in cmounts.iter() {
|
||||
if let Err(err) = sandbox.unset_and_remove_sandbox_storage(m) {
|
||||
if let Err(err) = sandbox.remove_sandbox_storage(m).await {
|
||||
error!(
|
||||
sl(),
|
||||
"failed to unset_and_remove_sandbox_storage for container {}, error: {:?}",
|
||||
|
@ -3,6 +3,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
@ -13,6 +14,7 @@ use std::{thread, time};
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use kata_types::cpu::CpuSet;
|
||||
use kata_types::mount::StorageDeviceGeneric;
|
||||
use libc::pid_t;
|
||||
use oci::{Hook, Hooks};
|
||||
use protocols::agent::OnlineCPUMemRequest;
|
||||
@ -28,7 +30,7 @@ use tokio::sync::Mutex;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::linux_abi::*;
|
||||
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
|
||||
use crate::mount::{get_mount_fs_type, is_mounted, remove_mounts, TYPE_ROOTFS};
|
||||
use crate::namespace::Namespace;
|
||||
use crate::netlink::Handle;
|
||||
use crate::network::Network;
|
||||
@ -40,6 +42,31 @@ pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
|
||||
|
||||
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StorageState {
|
||||
inner: Arc<Mutex<StorageDeviceGeneric>>,
|
||||
}
|
||||
|
||||
impl StorageState {
|
||||
fn new() -> Self {
|
||||
StorageState {
|
||||
inner: Arc::new(Mutex::new(StorageDeviceGeneric::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ref_count(&self) -> u32 {
|
||||
self.inner.lock().await.ref_count()
|
||||
}
|
||||
|
||||
async fn inc_ref_count(&self) {
|
||||
self.inner.lock().await.inc_ref_count()
|
||||
}
|
||||
|
||||
async fn dec_and_test_ref_count(&self) -> bool {
|
||||
self.inner.lock().await.dec_and_test_ref_count()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Sandbox {
|
||||
pub logger: Logger,
|
||||
@ -54,7 +81,7 @@ pub struct Sandbox {
|
||||
pub shared_utsns: Namespace,
|
||||
pub shared_ipcns: Namespace,
|
||||
pub sandbox_pidns: Option<Namespace>,
|
||||
pub storages: HashMap<String, u32>,
|
||||
pub storages: HashMap<String, StorageState>,
|
||||
pub running: bool,
|
||||
pub no_pivot_root: bool,
|
||||
pub sender: Option<tokio::sync::oneshot::Sender<i32>>,
|
||||
@ -100,52 +127,38 @@ impl Sandbox {
|
||||
})
|
||||
}
|
||||
|
||||
// set_sandbox_storage sets the sandbox level reference
|
||||
// counter for the sandbox storage.
|
||||
// This method also returns a boolean to let
|
||||
// callers know if the storage already existed or not.
|
||||
// It will return true if storage is new.
|
||||
/// Add a new storage object or increase reference count of existing one.
|
||||
/// The caller may detect new storage object by checking `StorageState.refcount == 1`.
|
||||
#[instrument]
|
||||
pub fn set_sandbox_storage(&mut self, path: &str) -> bool {
|
||||
match self.storages.get_mut(path) {
|
||||
None => {
|
||||
self.storages.insert(path.to_string(), 1);
|
||||
true
|
||||
pub async fn add_sandbox_storage(&mut self, path: &str) -> StorageState {
|
||||
match self.storages.entry(path.to_string()) {
|
||||
Entry::Occupied(e) => {
|
||||
let state = e.get().clone();
|
||||
state.inc_ref_count().await;
|
||||
state
|
||||
}
|
||||
Some(count) => {
|
||||
*count += 1;
|
||||
false
|
||||
Entry::Vacant(e) => {
|
||||
let state = StorageState::new();
|
||||
e.insert(state.clone());
|
||||
state
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// unset_sandbox_storage will decrement the sandbox storage
|
||||
// reference counter. If there aren't any containers using
|
||||
// that sandbox storage, this method will remove the
|
||||
// storage reference from the sandbox and return 'true' to
|
||||
// let the caller know that they can clean up the storage
|
||||
// related directories by calling remove_sandbox_storage
|
||||
// Clean mount and directory of a mountpoint.
|
||||
#[instrument]
|
||||
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> {
|
||||
match self.storages.get_mut(path) {
|
||||
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
|
||||
Some(count) => {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
self.storages.remove(path);
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
fn cleanup_sandbox_storage(&mut self, path: &str) -> Result<()> {
|
||||
if path.is_empty() {
|
||||
return Err(anyhow!("mountpoint path is empty"));
|
||||
} else if !Path::new(path).exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if matches!(is_mounted(path), Ok(true)) {
|
||||
let mounts = vec![path.to_string()];
|
||||
remove_mounts(&mounts)?;
|
||||
}
|
||||
}
|
||||
|
||||
// remove_sandbox_storage removes the sandbox storage if no
|
||||
// containers are using that storage.
|
||||
#[instrument]
|
||||
pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
|
||||
let mounts = vec![path.to_string()];
|
||||
remove_mounts(&mounts)?;
|
||||
// "remove_dir" will fail if the mount point is backed by a read-only filesystem.
|
||||
// This is the case with the device mapper snapshotter, where we mount the block device directly
|
||||
// at the underlying sandbox path which was provided from the base RO kataShared path from the host.
|
||||
@ -155,16 +168,23 @@ impl Sandbox {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// unset_and_remove_sandbox_storage unsets the storage from sandbox
|
||||
// and if there are no containers using this storage it will
|
||||
// remove it from the sandbox.
|
||||
/// Decrease reference count and destroy the storage object if reference count reaches zero.
|
||||
/// Returns `Ok(true)` if the reference count has reached zero and the storage object has been
|
||||
/// removed.
|
||||
#[instrument]
|
||||
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
|
||||
if self.unset_sandbox_storage(path)? {
|
||||
return self.remove_sandbox_storage(path);
|
||||
pub async fn remove_sandbox_storage(&mut self, path: &str) -> Result<bool> {
|
||||
match self.storages.get(path) {
|
||||
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
|
||||
Some(state) => {
|
||||
if state.dec_and_test_ref_count().await {
|
||||
self.storages.remove(path);
|
||||
self.cleanup_sandbox_storage(path)?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
@ -493,24 +513,22 @@ mod tests {
|
||||
let tmpdir_path = tmpdir.path().to_str().unwrap();
|
||||
|
||||
// Add a new sandbox storage
|
||||
let new_storage = s.set_sandbox_storage(tmpdir_path);
|
||||
let new_storage = s.add_sandbox_storage(tmpdir_path).await;
|
||||
|
||||
// Check the reference counter
|
||||
let ref_count = s.storages[tmpdir_path];
|
||||
let ref_count = new_storage.ref_count().await;
|
||||
assert_eq!(
|
||||
ref_count, 1,
|
||||
"Invalid refcount, got {} expected 1.",
|
||||
ref_count
|
||||
);
|
||||
assert!(new_storage);
|
||||
|
||||
// Use the existing sandbox storage
|
||||
let new_storage = s.set_sandbox_storage(tmpdir_path);
|
||||
assert!(!new_storage, "Should be false as already exists.");
|
||||
let new_storage = s.add_sandbox_storage(tmpdir_path).await;
|
||||
|
||||
// Since we are using existing storage, the reference counter
|
||||
// should be 2 by now.
|
||||
let ref_count = s.storages[tmpdir_path];
|
||||
let ref_count = new_storage.ref_count().await;
|
||||
assert_eq!(
|
||||
ref_count, 2,
|
||||
"Invalid refcount, got {} expected 2.",
|
||||
@ -546,22 +564,20 @@ mod tests {
|
||||
.tempdir_in(tmpdir_path)
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
s.remove_sandbox_storage(srcdir_path).is_err(),
|
||||
"Expect Err as the directory is not a mountpoint"
|
||||
);
|
||||
|
||||
assert!(s.remove_sandbox_storage("").is_err());
|
||||
assert!(s.cleanup_sandbox_storage("").is_err());
|
||||
|
||||
let invalid_dir = emptydir.path().join("invalid");
|
||||
|
||||
assert!(s
|
||||
.remove_sandbox_storage(invalid_dir.to_str().unwrap())
|
||||
.is_err());
|
||||
.cleanup_sandbox_storage(invalid_dir.to_str().unwrap())
|
||||
.is_ok());
|
||||
|
||||
assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok());
|
||||
|
||||
assert!(s.remove_sandbox_storage(destdir_path).is_ok());
|
||||
assert!(s.cleanup_sandbox_storage(destdir_path).is_ok());
|
||||
|
||||
// remove a directory without umount
|
||||
s.cleanup_sandbox_storage(srcdir_path).unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -573,8 +589,7 @@ mod tests {
|
||||
let mut s = Sandbox::new(&logger).unwrap();
|
||||
|
||||
assert!(
|
||||
s.unset_and_remove_sandbox_storage("/tmp/testEphePath")
|
||||
.is_err(),
|
||||
s.remove_sandbox_storage("/tmp/testEphePath").await.is_err(),
|
||||
"Should fail because sandbox storage doesn't exist"
|
||||
);
|
||||
|
||||
@ -595,8 +610,8 @@ mod tests {
|
||||
|
||||
assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok());
|
||||
|
||||
assert!(s.set_sandbox_storage(destdir_path));
|
||||
assert!(s.unset_and_remove_sandbox_storage(destdir_path).is_ok());
|
||||
s.add_sandbox_storage(destdir_path).await;
|
||||
assert!(s.remove_sandbox_storage(destdir_path).await.is_ok());
|
||||
|
||||
let other_dir_str;
|
||||
{
|
||||
@ -609,10 +624,10 @@ mod tests {
|
||||
let other_dir_path = other_dir.path().to_str().unwrap();
|
||||
other_dir_str = other_dir_path.to_string();
|
||||
|
||||
assert!(s.set_sandbox_storage(other_dir_path));
|
||||
s.add_sandbox_storage(other_dir_path).await;
|
||||
}
|
||||
|
||||
assert!(s.unset_and_remove_sandbox_storage(&other_dir_str).is_err());
|
||||
assert!(s.remove_sandbox_storage(&other_dir_str).await.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -624,28 +639,30 @@ mod tests {
|
||||
let storage_path = "/tmp/testEphe";
|
||||
|
||||
// Add a new sandbox storage
|
||||
assert!(s.set_sandbox_storage(storage_path));
|
||||
s.add_sandbox_storage(storage_path).await;
|
||||
// Use the existing sandbox storage
|
||||
let state = s.add_sandbox_storage(storage_path).await;
|
||||
assert!(
|
||||
!s.set_sandbox_storage(storage_path),
|
||||
state.ref_count().await > 1,
|
||||
"Expects false as the storage is not new."
|
||||
);
|
||||
|
||||
assert!(
|
||||
!s.unset_sandbox_storage(storage_path).unwrap(),
|
||||
!s.remove_sandbox_storage(storage_path).await.unwrap(),
|
||||
"Expects false as there is still a storage."
|
||||
);
|
||||
|
||||
// Reference counter should decrement to 1.
|
||||
let ref_count = s.storages[storage_path];
|
||||
let storage = &s.storages[storage_path];
|
||||
let refcount = storage.ref_count().await;
|
||||
assert_eq!(
|
||||
ref_count, 1,
|
||||
refcount, 1,
|
||||
"Invalid refcount, got {} expected 1.",
|
||||
ref_count
|
||||
refcount
|
||||
);
|
||||
|
||||
assert!(
|
||||
s.unset_sandbox_storage(storage_path).unwrap(),
|
||||
s.remove_sandbox_storage(storage_path).await.unwrap(),
|
||||
"Expects true as there is still a storage."
|
||||
);
|
||||
|
||||
@ -661,7 +678,7 @@ mod tests {
|
||||
// If no container is using the sandbox storage, the reference
|
||||
// counter for it should not exist.
|
||||
assert!(
|
||||
s.unset_sandbox_storage(storage_path).is_err(),
|
||||
s.remove_sandbox_storage(storage_path).await.is_err(),
|
||||
"Expects false as the reference counter should no exist."
|
||||
);
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
use anyhow::{anyhow, Context, Error, Result};
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Formatter;
|
||||
use std::{collections::HashMap, fs, path::PathBuf};
|
||||
|
||||
/// Prefix to mark a volume as Kata special.
|
||||
@ -427,6 +428,43 @@ impl TryFrom<&NydusExtraOptions> for KataVirtualVolume {
|
||||
}
|
||||
}
|
||||
|
||||
/// An implementation of generic storage device.
|
||||
pub struct StorageDeviceGeneric {
|
||||
refcount: u32,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for StorageDeviceGeneric {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("StorageState")
|
||||
.field("refcount", &self.refcount)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageDeviceGeneric {
|
||||
/// Create a new instance of `StorageStateCommon`.
|
||||
pub fn new() -> Self {
|
||||
StorageDeviceGeneric { refcount: 0 }
|
||||
}
|
||||
|
||||
/// Get reference count.
|
||||
pub fn ref_count(&self) -> u32 {
|
||||
self.refcount
|
||||
}
|
||||
|
||||
/// Decrease reference count and return true if it reaches zero.
|
||||
pub fn inc_ref_count(&mut self) {
|
||||
self.refcount += 1;
|
||||
}
|
||||
|
||||
/// Decrease reference count and return true if it reaches zero.
|
||||
pub fn dec_and_test_ref_count(&mut self) -> bool {
|
||||
assert!(self.refcount > 0);
|
||||
self.refcount -= 1;
|
||||
self.refcount == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Join user provided volume path with kata direct-volume root path.
|
||||
///
|
||||
/// The `volume_path` is base64-encoded and then safely joined to the `prefix`
|
||||
@ -659,4 +697,18 @@ mod tests {
|
||||
);
|
||||
assert_eq!(volume.fs_type.as_str(), "rafsv6")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_storage_state_common() {
|
||||
let mut state = StorageDeviceGeneric::new();
|
||||
assert_eq!(state.ref_count(), 0);
|
||||
state.inc_ref_count();
|
||||
assert_eq!(state.ref_count(), 1);
|
||||
state.inc_ref_count();
|
||||
assert_eq!(state.ref_count(), 2);
|
||||
assert!(!state.dec_and_test_ref_count());
|
||||
assert_eq!(state.ref_count(), 1);
|
||||
assert!(state.dec_and_test_ref_count());
|
||||
assert_eq!(state.ref_count(), 0);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user