diff --git a/src/agent/src/mount.rs b/src/agent/src/mount.rs index 1602d7ac43..73fa9b183d 100644 --- a/src/agent/src/mount.rs +++ b/src/agent/src/mount.rs @@ -741,8 +741,8 @@ pub async fn add_storages( { let mut sb = sandbox.lock().await; - let new_storage = sb.set_sandbox_storage(&storage.mount_point); - if !new_storage { + let state = sb.add_sandbox_storage(&storage.mount_point).await; + if state.ref_count().await > 1 { continue; } } @@ -780,7 +780,7 @@ pub async fn add_storages( "add_storages failed, storage: {:?}, error: {:?} ", storage, e ); let mut sb = sandbox.lock().await; - if let Err(e) = sb.unset_sandbox_storage(&storage.mount_point) { + if let Err(e) = sb.remove_sandbox_storage(&storage.mount_point).await { warn!(logger, "fail to unset sandbox storage {:?}", e); } return Err(e); @@ -1884,7 +1884,7 @@ mod tests { for (i, d) in tests.iter().enumerate() { let msg = format!("test[{}]: {:?}", i, d); - let result = parse_mount_options(&d.options_vec)?; + let result = parse_mount_options(&d.options_vec).unwrap(); let msg = format!("{}: result: {:?}", msg, result); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index e1b15d28c6..8210c0722e 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -308,7 +308,7 @@ impl AgentService { if let Err(e) = ctr.destroy().await { error!(sl(), "failed to destroy container: {:?}", e); } - if let Err(e) = remove_container_resources(&mut s, &cid) { + if let Err(e) = remove_container_resources(&mut s, &cid).await { error!(sl(), "failed to remove container resources: {:?}", e); } return Err(err); @@ -360,7 +360,7 @@ impl AgentService { .ok_or_else(|| anyhow!("Invalid container id"))? .destroy() .await?; - remove_container_resources(&mut sandbox, &cid)?; + remove_container_resources(&mut sandbox, &cid).await?; return Ok(()); } @@ -382,7 +382,7 @@ impl AgentService { .await .map_err(|_| anyhow!(nix::Error::ETIME))???; - remove_container_resources(&mut *self.sandbox.lock().await, &cid) + remove_container_resources(&mut *self.sandbox.lock().await, &cid).await } #[instrument] @@ -1679,7 +1679,7 @@ fn update_container_namespaces( Ok(()) } -fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> { +async fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> { let mut cmounts: Vec = vec![]; // Find the sandbox storage used by this container @@ -1693,7 +1693,7 @@ fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> { } for m in cmounts.iter() { - if let Err(err) = sandbox.unset_and_remove_sandbox_storage(m) { + if let Err(err) = sandbox.remove_sandbox_storage(m).await { error!( sl(), "failed to unset_and_remove_sandbox_storage for container {}, error: {:?}", diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 473c7cee7f..7ffd81b56b 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 // +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fs; use std::os::unix::fs::PermissionsExt; @@ -13,6 +14,7 @@ use std::{thread, time}; use anyhow::{anyhow, Context, Result}; use kata_types::cpu::CpuSet; +use kata_types::mount::StorageDeviceGeneric; use libc::pid_t; use oci::{Hook, Hooks}; use protocols::agent::OnlineCPUMemRequest; @@ -28,7 +30,7 @@ use tokio::sync::Mutex; use tracing::instrument; use crate::linux_abi::*; -use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; +use crate::mount::{get_mount_fs_type, is_mounted, remove_mounts, TYPE_ROOTFS}; use crate::namespace::Namespace; use crate::netlink::Handle; use crate::network::Network; @@ -40,6 +42,31 @@ pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id"; type UeventWatcher = (Box, oneshot::Sender); +#[derive(Clone, Debug)] +pub struct StorageState { + inner: Arc>, +} + +impl StorageState { + fn new() -> Self { + StorageState { + inner: Arc::new(Mutex::new(StorageDeviceGeneric::new())), + } + } + + pub async fn ref_count(&self) -> u32 { + self.inner.lock().await.ref_count() + } + + async fn inc_ref_count(&self) { + self.inner.lock().await.inc_ref_count() + } + + async fn dec_and_test_ref_count(&self) -> bool { + self.inner.lock().await.dec_and_test_ref_count() + } +} + #[derive(Debug)] pub struct Sandbox { pub logger: Logger, @@ -54,7 +81,7 @@ pub struct Sandbox { pub shared_utsns: Namespace, pub shared_ipcns: Namespace, pub sandbox_pidns: Option, - pub storages: HashMap, + pub storages: HashMap, pub running: bool, pub no_pivot_root: bool, pub sender: Option>, @@ -100,52 +127,38 @@ impl Sandbox { }) } - // set_sandbox_storage sets the sandbox level reference - // counter for the sandbox storage. - // This method also returns a boolean to let - // callers know if the storage already existed or not. - // It will return true if storage is new. + /// Add a new storage object or increase reference count of existing one. + /// The caller may detect new storage object by checking `StorageState.refcount == 1`. #[instrument] - pub fn set_sandbox_storage(&mut self, path: &str) -> bool { - match self.storages.get_mut(path) { - None => { - self.storages.insert(path.to_string(), 1); - true + pub async fn add_sandbox_storage(&mut self, path: &str) -> StorageState { + match self.storages.entry(path.to_string()) { + Entry::Occupied(e) => { + let state = e.get().clone(); + state.inc_ref_count().await; + state } - Some(count) => { - *count += 1; - false + Entry::Vacant(e) => { + let state = StorageState::new(); + e.insert(state.clone()); + state } } } - // unset_sandbox_storage will decrement the sandbox storage - // reference counter. If there aren't any containers using - // that sandbox storage, this method will remove the - // storage reference from the sandbox and return 'true' to - // let the caller know that they can clean up the storage - // related directories by calling remove_sandbox_storage + // Clean mount and directory of a mountpoint. #[instrument] - pub fn unset_sandbox_storage(&mut self, path: &str) -> Result { - match self.storages.get_mut(path) { - None => Err(anyhow!("Sandbox storage with path {} not found", path)), - Some(count) => { - *count -= 1; - if *count == 0 { - self.storages.remove(path); - return Ok(true); - } - Ok(false) - } + fn cleanup_sandbox_storage(&mut self, path: &str) -> Result<()> { + if path.is_empty() { + return Err(anyhow!("mountpoint path is empty")); + } else if !Path::new(path).exists() { + return Ok(()); + } + + if matches!(is_mounted(path), Ok(true)) { + let mounts = vec![path.to_string()]; + remove_mounts(&mounts)?; } - } - // remove_sandbox_storage removes the sandbox storage if no - // containers are using that storage. - #[instrument] - pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> { - let mounts = vec![path.to_string()]; - remove_mounts(&mounts)?; // "remove_dir" will fail if the mount point is backed by a read-only filesystem. // This is the case with the device mapper snapshotter, where we mount the block device directly // at the underlying sandbox path which was provided from the base RO kataShared path from the host. @@ -155,16 +168,23 @@ impl Sandbox { Ok(()) } - // unset_and_remove_sandbox_storage unsets the storage from sandbox - // and if there are no containers using this storage it will - // remove it from the sandbox. + /// Decrease reference count and destroy the storage object if reference count reaches zero. + /// Returns `Ok(true)` if the reference count has reached zero and the storage object has been + /// removed. #[instrument] - pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> { - if self.unset_sandbox_storage(path)? { - return self.remove_sandbox_storage(path); + pub async fn remove_sandbox_storage(&mut self, path: &str) -> Result { + match self.storages.get(path) { + None => Err(anyhow!("Sandbox storage with path {} not found", path)), + Some(state) => { + if state.dec_and_test_ref_count().await { + self.storages.remove(path); + self.cleanup_sandbox_storage(path)?; + Ok(true) + } else { + Ok(false) + } + } } - - Ok(()) } #[instrument] @@ -493,24 +513,22 @@ mod tests { let tmpdir_path = tmpdir.path().to_str().unwrap(); // Add a new sandbox storage - let new_storage = s.set_sandbox_storage(tmpdir_path); + let new_storage = s.add_sandbox_storage(tmpdir_path).await; // Check the reference counter - let ref_count = s.storages[tmpdir_path]; + let ref_count = new_storage.ref_count().await; assert_eq!( ref_count, 1, "Invalid refcount, got {} expected 1.", ref_count ); - assert!(new_storage); // Use the existing sandbox storage - let new_storage = s.set_sandbox_storage(tmpdir_path); - assert!(!new_storage, "Should be false as already exists."); + let new_storage = s.add_sandbox_storage(tmpdir_path).await; // Since we are using existing storage, the reference counter // should be 2 by now. - let ref_count = s.storages[tmpdir_path]; + let ref_count = new_storage.ref_count().await; assert_eq!( ref_count, 2, "Invalid refcount, got {} expected 2.", @@ -546,22 +564,20 @@ mod tests { .tempdir_in(tmpdir_path) .unwrap(); - assert!( - s.remove_sandbox_storage(srcdir_path).is_err(), - "Expect Err as the directory is not a mountpoint" - ); - - assert!(s.remove_sandbox_storage("").is_err()); + assert!(s.cleanup_sandbox_storage("").is_err()); let invalid_dir = emptydir.path().join("invalid"); assert!(s - .remove_sandbox_storage(invalid_dir.to_str().unwrap()) - .is_err()); + .cleanup_sandbox_storage(invalid_dir.to_str().unwrap()) + .is_ok()); assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok()); - assert!(s.remove_sandbox_storage(destdir_path).is_ok()); + assert!(s.cleanup_sandbox_storage(destdir_path).is_ok()); + + // remove a directory without umount + s.cleanup_sandbox_storage(srcdir_path).unwrap(); } #[tokio::test] @@ -573,8 +589,7 @@ mod tests { let mut s = Sandbox::new(&logger).unwrap(); assert!( - s.unset_and_remove_sandbox_storage("/tmp/testEphePath") - .is_err(), + s.remove_sandbox_storage("/tmp/testEphePath").await.is_err(), "Should fail because sandbox storage doesn't exist" ); @@ -595,8 +610,8 @@ mod tests { assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok()); - assert!(s.set_sandbox_storage(destdir_path)); - assert!(s.unset_and_remove_sandbox_storage(destdir_path).is_ok()); + s.add_sandbox_storage(destdir_path).await; + assert!(s.remove_sandbox_storage(destdir_path).await.is_ok()); let other_dir_str; { @@ -609,10 +624,10 @@ mod tests { let other_dir_path = other_dir.path().to_str().unwrap(); other_dir_str = other_dir_path.to_string(); - assert!(s.set_sandbox_storage(other_dir_path)); + s.add_sandbox_storage(other_dir_path).await; } - assert!(s.unset_and_remove_sandbox_storage(&other_dir_str).is_err()); + assert!(s.remove_sandbox_storage(&other_dir_str).await.is_ok()); } #[tokio::test] @@ -624,28 +639,30 @@ mod tests { let storage_path = "/tmp/testEphe"; // Add a new sandbox storage - assert!(s.set_sandbox_storage(storage_path)); + s.add_sandbox_storage(storage_path).await; // Use the existing sandbox storage + let state = s.add_sandbox_storage(storage_path).await; assert!( - !s.set_sandbox_storage(storage_path), + state.ref_count().await > 1, "Expects false as the storage is not new." ); assert!( - !s.unset_sandbox_storage(storage_path).unwrap(), + !s.remove_sandbox_storage(storage_path).await.unwrap(), "Expects false as there is still a storage." ); // Reference counter should decrement to 1. - let ref_count = s.storages[storage_path]; + let storage = &s.storages[storage_path]; + let refcount = storage.ref_count().await; assert_eq!( - ref_count, 1, + refcount, 1, "Invalid refcount, got {} expected 1.", - ref_count + refcount ); assert!( - s.unset_sandbox_storage(storage_path).unwrap(), + s.remove_sandbox_storage(storage_path).await.unwrap(), "Expects true as there is still a storage." ); @@ -661,7 +678,7 @@ mod tests { // If no container is using the sandbox storage, the reference // counter for it should not exist. assert!( - s.unset_sandbox_storage(storage_path).is_err(), + s.remove_sandbox_storage(storage_path).await.is_err(), "Expects false as the reference counter should no exist." ); } diff --git a/src/libs/kata-types/src/mount.rs b/src/libs/kata-types/src/mount.rs index 473e7d5c08..91a061797a 100644 --- a/src/libs/kata-types/src/mount.rs +++ b/src/libs/kata-types/src/mount.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, Context, Error, Result}; use std::convert::TryFrom; +use std::fmt::Formatter; use std::{collections::HashMap, fs, path::PathBuf}; /// Prefix to mark a volume as Kata special. @@ -427,6 +428,43 @@ impl TryFrom<&NydusExtraOptions> for KataVirtualVolume { } } +/// An implementation of generic storage device. +pub struct StorageDeviceGeneric { + refcount: u32, +} + +impl std::fmt::Debug for StorageDeviceGeneric { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StorageState") + .field("refcount", &self.refcount) + .finish() + } +} + +impl StorageDeviceGeneric { + /// Create a new instance of `StorageStateCommon`. + pub fn new() -> Self { + StorageDeviceGeneric { refcount: 0 } + } + + /// Get reference count. + pub fn ref_count(&self) -> u32 { + self.refcount + } + + /// Decrease reference count and return true if it reaches zero. + pub fn inc_ref_count(&mut self) { + self.refcount += 1; + } + + /// Decrease reference count and return true if it reaches zero. + pub fn dec_and_test_ref_count(&mut self) -> bool { + assert!(self.refcount > 0); + self.refcount -= 1; + self.refcount == 0 + } +} + /// Join user provided volume path with kata direct-volume root path. /// /// The `volume_path` is base64-encoded and then safely joined to the `prefix` @@ -659,4 +697,18 @@ mod tests { ); assert_eq!(volume.fs_type.as_str(), "rafsv6") } + + #[test] + fn test_storage_state_common() { + let mut state = StorageDeviceGeneric::new(); + assert_eq!(state.ref_count(), 0); + state.inc_ref_count(); + assert_eq!(state.ref_count(), 1); + state.inc_ref_count(); + assert_eq!(state.ref_count(), 2); + assert!(!state.dec_and_test_ref_count()); + assert_eq!(state.ref_count(), 1); + assert!(state.dec_and_test_ref_count()); + assert_eq!(state.ref_count(), 0); + } }