agent: simplify the way to manage storage object

Simplify the way to manage storage objects, and introduce
StorageStateCommon structures for coming extensions.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-08 21:52:02 +08:00
parent 8392c71bf2
commit b03b1f6134
4 changed files with 154 additions and 85 deletions

View File

@ -741,8 +741,8 @@ pub async fn add_storages(
{ {
let mut sb = sandbox.lock().await; let mut sb = sandbox.lock().await;
let new_storage = sb.set_sandbox_storage(&storage.mount_point); let state = sb.add_sandbox_storage(&storage.mount_point).await;
if !new_storage { if state.ref_count().await > 1 {
continue; continue;
} }
} }
@ -780,7 +780,7 @@ pub async fn add_storages(
"add_storages failed, storage: {:?}, error: {:?} ", storage, e "add_storages failed, storage: {:?}, error: {:?} ", storage, e
); );
let mut sb = sandbox.lock().await; let mut sb = sandbox.lock().await;
if let Err(e) = sb.unset_sandbox_storage(&storage.mount_point) { if let Err(e) = sb.remove_sandbox_storage(&storage.mount_point).await {
warn!(logger, "fail to unset sandbox storage {:?}", e); warn!(logger, "fail to unset sandbox storage {:?}", e);
} }
return Err(e); return Err(e);
@ -1884,7 +1884,7 @@ mod tests {
for (i, d) in tests.iter().enumerate() { for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d); let msg = format!("test[{}]: {:?}", i, d);
let result = parse_mount_options(&d.options_vec)?; let result = parse_mount_options(&d.options_vec).unwrap();
let msg = format!("{}: result: {:?}", msg, result); let msg = format!("{}: result: {:?}", msg, result);

View File

@ -308,7 +308,7 @@ impl AgentService {
if let Err(e) = ctr.destroy().await { if let Err(e) = ctr.destroy().await {
error!(sl(), "failed to destroy container: {:?}", e); error!(sl(), "failed to destroy container: {:?}", e);
} }
if let Err(e) = remove_container_resources(&mut s, &cid) { if let Err(e) = remove_container_resources(&mut s, &cid).await {
error!(sl(), "failed to remove container resources: {:?}", e); error!(sl(), "failed to remove container resources: {:?}", e);
} }
return Err(err); return Err(err);
@ -360,7 +360,7 @@ impl AgentService {
.ok_or_else(|| anyhow!("Invalid container id"))? .ok_or_else(|| anyhow!("Invalid container id"))?
.destroy() .destroy()
.await?; .await?;
remove_container_resources(&mut sandbox, &cid)?; remove_container_resources(&mut sandbox, &cid).await?;
return Ok(()); return Ok(());
} }
@ -382,7 +382,7 @@ impl AgentService {
.await .await
.map_err(|_| anyhow!(nix::Error::ETIME))???; .map_err(|_| anyhow!(nix::Error::ETIME))???;
remove_container_resources(&mut *self.sandbox.lock().await, &cid) remove_container_resources(&mut *self.sandbox.lock().await, &cid).await
} }
#[instrument] #[instrument]
@ -1679,7 +1679,7 @@ fn update_container_namespaces(
Ok(()) Ok(())
} }
fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> { async fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
let mut cmounts: Vec<String> = vec![]; let mut cmounts: Vec<String> = vec![];
// Find the sandbox storage used by this container // Find the sandbox storage used by this container
@ -1693,7 +1693,7 @@ fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
} }
for m in cmounts.iter() { for m in cmounts.iter() {
if let Err(err) = sandbox.unset_and_remove_sandbox_storage(m) { if let Err(err) = sandbox.remove_sandbox_storage(m).await {
error!( error!(
sl(), sl(),
"failed to unset_and_remove_sandbox_storage for container {}, error: {:?}", "failed to unset_and_remove_sandbox_storage for container {}, error: {:?}",

View File

@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs; use std::fs;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
@ -13,6 +14,7 @@ use std::{thread, time};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_types::cpu::CpuSet; use kata_types::cpu::CpuSet;
use kata_types::mount::StorageDeviceGeneric;
use libc::pid_t; use libc::pid_t;
use oci::{Hook, Hooks}; use oci::{Hook, Hooks};
use protocols::agent::OnlineCPUMemRequest; use protocols::agent::OnlineCPUMemRequest;
@ -28,7 +30,7 @@ use tokio::sync::Mutex;
use tracing::instrument; use tracing::instrument;
use crate::linux_abi::*; use crate::linux_abi::*;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; use crate::mount::{get_mount_fs_type, is_mounted, remove_mounts, TYPE_ROOTFS};
use crate::namespace::Namespace; use crate::namespace::Namespace;
use crate::netlink::Handle; use crate::netlink::Handle;
use crate::network::Network; use crate::network::Network;
@ -40,6 +42,31 @@ pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>); type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
#[derive(Clone, Debug)]
pub struct StorageState {
inner: Arc<Mutex<StorageDeviceGeneric>>,
}
impl StorageState {
fn new() -> Self {
StorageState {
inner: Arc::new(Mutex::new(StorageDeviceGeneric::new())),
}
}
pub async fn ref_count(&self) -> u32 {
self.inner.lock().await.ref_count()
}
async fn inc_ref_count(&self) {
self.inner.lock().await.inc_ref_count()
}
async fn dec_and_test_ref_count(&self) -> bool {
self.inner.lock().await.dec_and_test_ref_count()
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Sandbox { pub struct Sandbox {
pub logger: Logger, pub logger: Logger,
@ -54,7 +81,7 @@ pub struct Sandbox {
pub shared_utsns: Namespace, pub shared_utsns: Namespace,
pub shared_ipcns: Namespace, pub shared_ipcns: Namespace,
pub sandbox_pidns: Option<Namespace>, pub sandbox_pidns: Option<Namespace>,
pub storages: HashMap<String, u32>, pub storages: HashMap<String, StorageState>,
pub running: bool, pub running: bool,
pub no_pivot_root: bool, pub no_pivot_root: bool,
pub sender: Option<tokio::sync::oneshot::Sender<i32>>, pub sender: Option<tokio::sync::oneshot::Sender<i32>>,
@ -100,52 +127,38 @@ impl Sandbox {
}) })
} }
// set_sandbox_storage sets the sandbox level reference /// Add a new storage object or increase reference count of existing one.
// counter for the sandbox storage. /// The caller may detect new storage object by checking `StorageState.refcount == 1`.
// This method also returns a boolean to let
// callers know if the storage already existed or not.
// It will return true if storage is new.
#[instrument] #[instrument]
pub fn set_sandbox_storage(&mut self, path: &str) -> bool { pub async fn add_sandbox_storage(&mut self, path: &str) -> StorageState {
match self.storages.get_mut(path) { match self.storages.entry(path.to_string()) {
None => { Entry::Occupied(e) => {
self.storages.insert(path.to_string(), 1); let state = e.get().clone();
true state.inc_ref_count().await;
state
} }
Some(count) => { Entry::Vacant(e) => {
*count += 1; let state = StorageState::new();
false e.insert(state.clone());
state
} }
} }
} }
// unset_sandbox_storage will decrement the sandbox storage // Clean mount and directory of a mountpoint.
// reference counter. If there aren't any containers using
// that sandbox storage, this method will remove the
// storage reference from the sandbox and return 'true' to
// let the caller know that they can clean up the storage
// related directories by calling remove_sandbox_storage
#[instrument] #[instrument]
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> { fn cleanup_sandbox_storage(&mut self, path: &str) -> Result<()> {
match self.storages.get_mut(path) { if path.is_empty() {
None => Err(anyhow!("Sandbox storage with path {} not found", path)), return Err(anyhow!("mountpoint path is empty"));
Some(count) => { } else if !Path::new(path).exists() {
*count -= 1; return Ok(());
if *count == 0 { }
self.storages.remove(path);
return Ok(true); if matches!(is_mounted(path), Ok(true)) {
} let mounts = vec![path.to_string()];
Ok(false) remove_mounts(&mounts)?;
}
} }
}
// remove_sandbox_storage removes the sandbox storage if no
// containers are using that storage.
#[instrument]
pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
let mounts = vec![path.to_string()];
remove_mounts(&mounts)?;
// "remove_dir" will fail if the mount point is backed by a read-only filesystem. // "remove_dir" will fail if the mount point is backed by a read-only filesystem.
// This is the case with the device mapper snapshotter, where we mount the block device directly // This is the case with the device mapper snapshotter, where we mount the block device directly
// at the underlying sandbox path which was provided from the base RO kataShared path from the host. // at the underlying sandbox path which was provided from the base RO kataShared path from the host.
@ -155,16 +168,23 @@ impl Sandbox {
Ok(()) Ok(())
} }
// unset_and_remove_sandbox_storage unsets the storage from sandbox /// Decrease reference count and destroy the storage object if reference count reaches zero.
// and if there are no containers using this storage it will /// Returns `Ok(true)` if the reference count has reached zero and the storage object has been
// remove it from the sandbox. /// removed.
#[instrument] #[instrument]
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> { pub async fn remove_sandbox_storage(&mut self, path: &str) -> Result<bool> {
if self.unset_sandbox_storage(path)? { match self.storages.get(path) {
return self.remove_sandbox_storage(path); None => Err(anyhow!("Sandbox storage with path {} not found", path)),
Some(state) => {
if state.dec_and_test_ref_count().await {
self.storages.remove(path);
self.cleanup_sandbox_storage(path)?;
Ok(true)
} else {
Ok(false)
}
}
} }
Ok(())
} }
#[instrument] #[instrument]
@ -493,24 +513,22 @@ mod tests {
let tmpdir_path = tmpdir.path().to_str().unwrap(); let tmpdir_path = tmpdir.path().to_str().unwrap();
// Add a new sandbox storage // Add a new sandbox storage
let new_storage = s.set_sandbox_storage(tmpdir_path); let new_storage = s.add_sandbox_storage(tmpdir_path).await;
// Check the reference counter // Check the reference counter
let ref_count = s.storages[tmpdir_path]; let ref_count = new_storage.ref_count().await;
assert_eq!( assert_eq!(
ref_count, 1, ref_count, 1,
"Invalid refcount, got {} expected 1.", "Invalid refcount, got {} expected 1.",
ref_count ref_count
); );
assert!(new_storage);
// Use the existing sandbox storage // Use the existing sandbox storage
let new_storage = s.set_sandbox_storage(tmpdir_path); let new_storage = s.add_sandbox_storage(tmpdir_path).await;
assert!(!new_storage, "Should be false as already exists.");
// Since we are using existing storage, the reference counter // Since we are using existing storage, the reference counter
// should be 2 by now. // should be 2 by now.
let ref_count = s.storages[tmpdir_path]; let ref_count = new_storage.ref_count().await;
assert_eq!( assert_eq!(
ref_count, 2, ref_count, 2,
"Invalid refcount, got {} expected 2.", "Invalid refcount, got {} expected 2.",
@ -546,22 +564,20 @@ mod tests {
.tempdir_in(tmpdir_path) .tempdir_in(tmpdir_path)
.unwrap(); .unwrap();
assert!( assert!(s.cleanup_sandbox_storage("").is_err());
s.remove_sandbox_storage(srcdir_path).is_err(),
"Expect Err as the directory is not a mountpoint"
);
assert!(s.remove_sandbox_storage("").is_err());
let invalid_dir = emptydir.path().join("invalid"); let invalid_dir = emptydir.path().join("invalid");
assert!(s assert!(s
.remove_sandbox_storage(invalid_dir.to_str().unwrap()) .cleanup_sandbox_storage(invalid_dir.to_str().unwrap())
.is_err()); .is_ok());
assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok()); assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok());
assert!(s.remove_sandbox_storage(destdir_path).is_ok()); assert!(s.cleanup_sandbox_storage(destdir_path).is_ok());
// remove a directory without umount
s.cleanup_sandbox_storage(srcdir_path).unwrap();
} }
#[tokio::test] #[tokio::test]
@ -573,8 +589,7 @@ mod tests {
let mut s = Sandbox::new(&logger).unwrap(); let mut s = Sandbox::new(&logger).unwrap();
assert!( assert!(
s.unset_and_remove_sandbox_storage("/tmp/testEphePath") s.remove_sandbox_storage("/tmp/testEphePath").await.is_err(),
.is_err(),
"Should fail because sandbox storage doesn't exist" "Should fail because sandbox storage doesn't exist"
); );
@ -595,8 +610,8 @@ mod tests {
assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok()); assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok());
assert!(s.set_sandbox_storage(destdir_path)); s.add_sandbox_storage(destdir_path).await;
assert!(s.unset_and_remove_sandbox_storage(destdir_path).is_ok()); assert!(s.remove_sandbox_storage(destdir_path).await.is_ok());
let other_dir_str; let other_dir_str;
{ {
@ -609,10 +624,10 @@ mod tests {
let other_dir_path = other_dir.path().to_str().unwrap(); let other_dir_path = other_dir.path().to_str().unwrap();
other_dir_str = other_dir_path.to_string(); other_dir_str = other_dir_path.to_string();
assert!(s.set_sandbox_storage(other_dir_path)); s.add_sandbox_storage(other_dir_path).await;
} }
assert!(s.unset_and_remove_sandbox_storage(&other_dir_str).is_err()); assert!(s.remove_sandbox_storage(&other_dir_str).await.is_ok());
} }
#[tokio::test] #[tokio::test]
@ -624,28 +639,30 @@ mod tests {
let storage_path = "/tmp/testEphe"; let storage_path = "/tmp/testEphe";
// Add a new sandbox storage // Add a new sandbox storage
assert!(s.set_sandbox_storage(storage_path)); s.add_sandbox_storage(storage_path).await;
// Use the existing sandbox storage // Use the existing sandbox storage
let state = s.add_sandbox_storage(storage_path).await;
assert!( assert!(
!s.set_sandbox_storage(storage_path), state.ref_count().await > 1,
"Expects false as the storage is not new." "Expects false as the storage is not new."
); );
assert!( assert!(
!s.unset_sandbox_storage(storage_path).unwrap(), !s.remove_sandbox_storage(storage_path).await.unwrap(),
"Expects false as there is still a storage." "Expects false as there is still a storage."
); );
// Reference counter should decrement to 1. // Reference counter should decrement to 1.
let ref_count = s.storages[storage_path]; let storage = &s.storages[storage_path];
let refcount = storage.ref_count().await;
assert_eq!( assert_eq!(
ref_count, 1, refcount, 1,
"Invalid refcount, got {} expected 1.", "Invalid refcount, got {} expected 1.",
ref_count refcount
); );
assert!( assert!(
s.unset_sandbox_storage(storage_path).unwrap(), s.remove_sandbox_storage(storage_path).await.unwrap(),
"Expects true as there is still a storage." "Expects true as there is still a storage."
); );
@ -661,7 +678,7 @@ mod tests {
// If no container is using the sandbox storage, the reference // If no container is using the sandbox storage, the reference
// counter for it should not exist. // counter for it should not exist.
assert!( assert!(
s.unset_sandbox_storage(storage_path).is_err(), s.remove_sandbox_storage(storage_path).await.is_err(),
"Expects false as the reference counter should no exist." "Expects false as the reference counter should no exist."
); );
} }

View File

@ -6,6 +6,7 @@
use anyhow::{anyhow, Context, Error, Result}; use anyhow::{anyhow, Context, Error, Result};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt::Formatter;
use std::{collections::HashMap, fs, path::PathBuf}; use std::{collections::HashMap, fs, path::PathBuf};
/// Prefix to mark a volume as Kata special. /// Prefix to mark a volume as Kata special.
@ -427,6 +428,43 @@ impl TryFrom<&NydusExtraOptions> for KataVirtualVolume {
} }
} }
/// An implementation of generic storage device.
pub struct StorageDeviceGeneric {
refcount: u32,
}
impl std::fmt::Debug for StorageDeviceGeneric {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageState")
.field("refcount", &self.refcount)
.finish()
}
}
impl StorageDeviceGeneric {
/// Create a new instance of `StorageStateCommon`.
pub fn new() -> Self {
StorageDeviceGeneric { refcount: 0 }
}
/// Get reference count.
pub fn ref_count(&self) -> u32 {
self.refcount
}
/// Decrease reference count and return true if it reaches zero.
pub fn inc_ref_count(&mut self) {
self.refcount += 1;
}
/// Decrease reference count and return true if it reaches zero.
pub fn dec_and_test_ref_count(&mut self) -> bool {
assert!(self.refcount > 0);
self.refcount -= 1;
self.refcount == 0
}
}
/// Join user provided volume path with kata direct-volume root path. /// Join user provided volume path with kata direct-volume root path.
/// ///
/// The `volume_path` is base64-encoded and then safely joined to the `prefix` /// The `volume_path` is base64-encoded and then safely joined to the `prefix`
@ -659,4 +697,18 @@ mod tests {
); );
assert_eq!(volume.fs_type.as_str(), "rafsv6") assert_eq!(volume.fs_type.as_str(), "rafsv6")
} }
#[test]
fn test_storage_state_common() {
let mut state = StorageDeviceGeneric::new();
assert_eq!(state.ref_count(), 0);
state.inc_ref_count();
assert_eq!(state.ref_count(), 1);
state.inc_ref_count();
assert_eq!(state.ref_count(), 2);
assert!(!state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 1);
assert!(state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 0);
}
} }