agent: simplify the way to manage storage object

Simplify the way to manage storage objects, and introduce
StorageStateCommon structures for coming extensions.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-08 21:52:02 +08:00
parent 8392c71bf2
commit b03b1f6134
4 changed files with 154 additions and 85 deletions

View File

@ -741,8 +741,8 @@ pub async fn add_storages(
{
let mut sb = sandbox.lock().await;
let new_storage = sb.set_sandbox_storage(&storage.mount_point);
if !new_storage {
let state = sb.add_sandbox_storage(&storage.mount_point).await;
if state.ref_count().await > 1 {
continue;
}
}
@ -780,7 +780,7 @@ pub async fn add_storages(
"add_storages failed, storage: {:?}, error: {:?} ", storage, e
);
let mut sb = sandbox.lock().await;
if let Err(e) = sb.unset_sandbox_storage(&storage.mount_point) {
if let Err(e) = sb.remove_sandbox_storage(&storage.mount_point).await {
warn!(logger, "fail to unset sandbox storage {:?}", e);
}
return Err(e);
@ -1884,7 +1884,7 @@ mod tests {
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let result = parse_mount_options(&d.options_vec)?;
let result = parse_mount_options(&d.options_vec).unwrap();
let msg = format!("{}: result: {:?}", msg, result);

View File

@ -308,7 +308,7 @@ impl AgentService {
if let Err(e) = ctr.destroy().await {
error!(sl(), "failed to destroy container: {:?}", e);
}
if let Err(e) = remove_container_resources(&mut s, &cid) {
if let Err(e) = remove_container_resources(&mut s, &cid).await {
error!(sl(), "failed to remove container resources: {:?}", e);
}
return Err(err);
@ -360,7 +360,7 @@ impl AgentService {
.ok_or_else(|| anyhow!("Invalid container id"))?
.destroy()
.await?;
remove_container_resources(&mut sandbox, &cid)?;
remove_container_resources(&mut sandbox, &cid).await?;
return Ok(());
}
@ -382,7 +382,7 @@ impl AgentService {
.await
.map_err(|_| anyhow!(nix::Error::ETIME))???;
remove_container_resources(&mut *self.sandbox.lock().await, &cid)
remove_container_resources(&mut *self.sandbox.lock().await, &cid).await
}
#[instrument]
@ -1679,7 +1679,7 @@ fn update_container_namespaces(
Ok(())
}
fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
async fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
let mut cmounts: Vec<String> = vec![];
// Find the sandbox storage used by this container
@ -1693,7 +1693,7 @@ fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
}
for m in cmounts.iter() {
if let Err(err) = sandbox.unset_and_remove_sandbox_storage(m) {
if let Err(err) = sandbox.remove_sandbox_storage(m).await {
error!(
sl(),
"failed to unset_and_remove_sandbox_storage for container {}, error: {:?}",

View File

@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
@ -13,6 +14,7 @@ use std::{thread, time};
use anyhow::{anyhow, Context, Result};
use kata_types::cpu::CpuSet;
use kata_types::mount::StorageDeviceGeneric;
use libc::pid_t;
use oci::{Hook, Hooks};
use protocols::agent::OnlineCPUMemRequest;
@ -28,7 +30,7 @@ use tokio::sync::Mutex;
use tracing::instrument;
use crate::linux_abi::*;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
use crate::mount::{get_mount_fs_type, is_mounted, remove_mounts, TYPE_ROOTFS};
use crate::namespace::Namespace;
use crate::netlink::Handle;
use crate::network::Network;
@ -40,6 +42,31 @@ pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
#[derive(Clone, Debug)]
pub struct StorageState {
inner: Arc<Mutex<StorageDeviceGeneric>>,
}
impl StorageState {
fn new() -> Self {
StorageState {
inner: Arc::new(Mutex::new(StorageDeviceGeneric::new())),
}
}
pub async fn ref_count(&self) -> u32 {
self.inner.lock().await.ref_count()
}
async fn inc_ref_count(&self) {
self.inner.lock().await.inc_ref_count()
}
async fn dec_and_test_ref_count(&self) -> bool {
self.inner.lock().await.dec_and_test_ref_count()
}
}
#[derive(Debug)]
pub struct Sandbox {
pub logger: Logger,
@ -54,7 +81,7 @@ pub struct Sandbox {
pub shared_utsns: Namespace,
pub shared_ipcns: Namespace,
pub sandbox_pidns: Option<Namespace>,
pub storages: HashMap<String, u32>,
pub storages: HashMap<String, StorageState>,
pub running: bool,
pub no_pivot_root: bool,
pub sender: Option<tokio::sync::oneshot::Sender<i32>>,
@ -100,52 +127,38 @@ impl Sandbox {
})
}
// set_sandbox_storage sets the sandbox level reference
// counter for the sandbox storage.
// This method also returns a boolean to let
// callers know if the storage already existed or not.
// It will return true if storage is new.
/// Add a new storage object or increase reference count of existing one.
/// The caller may detect new storage object by checking `StorageState.refcount == 1`.
#[instrument]
pub fn set_sandbox_storage(&mut self, path: &str) -> bool {
match self.storages.get_mut(path) {
None => {
self.storages.insert(path.to_string(), 1);
true
pub async fn add_sandbox_storage(&mut self, path: &str) -> StorageState {
match self.storages.entry(path.to_string()) {
Entry::Occupied(e) => {
let state = e.get().clone();
state.inc_ref_count().await;
state
}
Some(count) => {
*count += 1;
false
Entry::Vacant(e) => {
let state = StorageState::new();
e.insert(state.clone());
state
}
}
}
// unset_sandbox_storage will decrement the sandbox storage
// reference counter. If there aren't any containers using
// that sandbox storage, this method will remove the
// storage reference from the sandbox and return 'true' to
// let the caller know that they can clean up the storage
// related directories by calling remove_sandbox_storage
// Clean mount and directory of a mountpoint.
#[instrument]
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> {
match self.storages.get_mut(path) {
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
Some(count) => {
*count -= 1;
if *count == 0 {
self.storages.remove(path);
return Ok(true);
}
Ok(false)
}
fn cleanup_sandbox_storage(&mut self, path: &str) -> Result<()> {
if path.is_empty() {
return Err(anyhow!("mountpoint path is empty"));
} else if !Path::new(path).exists() {
return Ok(());
}
if matches!(is_mounted(path), Ok(true)) {
let mounts = vec![path.to_string()];
remove_mounts(&mounts)?;
}
}
// remove_sandbox_storage removes the sandbox storage if no
// containers are using that storage.
#[instrument]
pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
let mounts = vec![path.to_string()];
remove_mounts(&mounts)?;
// "remove_dir" will fail if the mount point is backed by a read-only filesystem.
// This is the case with the device mapper snapshotter, where we mount the block device directly
// at the underlying sandbox path which was provided from the base RO kataShared path from the host.
@ -155,16 +168,23 @@ impl Sandbox {
Ok(())
}
// unset_and_remove_sandbox_storage unsets the storage from sandbox
// and if there are no containers using this storage it will
// remove it from the sandbox.
/// Decrease reference count and destroy the storage object if reference count reaches zero.
/// Returns `Ok(true)` if the reference count has reached zero and the storage object has been
/// removed.
#[instrument]
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
if self.unset_sandbox_storage(path)? {
return self.remove_sandbox_storage(path);
pub async fn remove_sandbox_storage(&mut self, path: &str) -> Result<bool> {
match self.storages.get(path) {
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
Some(state) => {
if state.dec_and_test_ref_count().await {
self.storages.remove(path);
self.cleanup_sandbox_storage(path)?;
Ok(true)
} else {
Ok(false)
}
}
}
Ok(())
}
#[instrument]
@ -493,24 +513,22 @@ mod tests {
let tmpdir_path = tmpdir.path().to_str().unwrap();
// Add a new sandbox storage
let new_storage = s.set_sandbox_storage(tmpdir_path);
let new_storage = s.add_sandbox_storage(tmpdir_path).await;
// Check the reference counter
let ref_count = s.storages[tmpdir_path];
let ref_count = new_storage.ref_count().await;
assert_eq!(
ref_count, 1,
"Invalid refcount, got {} expected 1.",
ref_count
);
assert!(new_storage);
// Use the existing sandbox storage
let new_storage = s.set_sandbox_storage(tmpdir_path);
assert!(!new_storage, "Should be false as already exists.");
let new_storage = s.add_sandbox_storage(tmpdir_path).await;
// Since we are using existing storage, the reference counter
// should be 2 by now.
let ref_count = s.storages[tmpdir_path];
let ref_count = new_storage.ref_count().await;
assert_eq!(
ref_count, 2,
"Invalid refcount, got {} expected 2.",
@ -546,22 +564,20 @@ mod tests {
.tempdir_in(tmpdir_path)
.unwrap();
assert!(
s.remove_sandbox_storage(srcdir_path).is_err(),
"Expect Err as the directory is not a mountpoint"
);
assert!(s.remove_sandbox_storage("").is_err());
assert!(s.cleanup_sandbox_storage("").is_err());
let invalid_dir = emptydir.path().join("invalid");
assert!(s
.remove_sandbox_storage(invalid_dir.to_str().unwrap())
.is_err());
.cleanup_sandbox_storage(invalid_dir.to_str().unwrap())
.is_ok());
assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok());
assert!(s.remove_sandbox_storage(destdir_path).is_ok());
assert!(s.cleanup_sandbox_storage(destdir_path).is_ok());
// remove a directory without umount
s.cleanup_sandbox_storage(srcdir_path).unwrap();
}
#[tokio::test]
@ -573,8 +589,7 @@ mod tests {
let mut s = Sandbox::new(&logger).unwrap();
assert!(
s.unset_and_remove_sandbox_storage("/tmp/testEphePath")
.is_err(),
s.remove_sandbox_storage("/tmp/testEphePath").await.is_err(),
"Should fail because sandbox storage doesn't exist"
);
@ -595,8 +610,8 @@ mod tests {
assert!(bind_mount(srcdir_path, destdir_path, &logger).is_ok());
assert!(s.set_sandbox_storage(destdir_path));
assert!(s.unset_and_remove_sandbox_storage(destdir_path).is_ok());
s.add_sandbox_storage(destdir_path).await;
assert!(s.remove_sandbox_storage(destdir_path).await.is_ok());
let other_dir_str;
{
@ -609,10 +624,10 @@ mod tests {
let other_dir_path = other_dir.path().to_str().unwrap();
other_dir_str = other_dir_path.to_string();
assert!(s.set_sandbox_storage(other_dir_path));
s.add_sandbox_storage(other_dir_path).await;
}
assert!(s.unset_and_remove_sandbox_storage(&other_dir_str).is_err());
assert!(s.remove_sandbox_storage(&other_dir_str).await.is_ok());
}
#[tokio::test]
@ -624,28 +639,30 @@ mod tests {
let storage_path = "/tmp/testEphe";
// Add a new sandbox storage
assert!(s.set_sandbox_storage(storage_path));
s.add_sandbox_storage(storage_path).await;
// Use the existing sandbox storage
let state = s.add_sandbox_storage(storage_path).await;
assert!(
!s.set_sandbox_storage(storage_path),
state.ref_count().await > 1,
"Expects false as the storage is not new."
);
assert!(
!s.unset_sandbox_storage(storage_path).unwrap(),
!s.remove_sandbox_storage(storage_path).await.unwrap(),
"Expects false as there is still a storage."
);
// Reference counter should decrement to 1.
let ref_count = s.storages[storage_path];
let storage = &s.storages[storage_path];
let refcount = storage.ref_count().await;
assert_eq!(
ref_count, 1,
refcount, 1,
"Invalid refcount, got {} expected 1.",
ref_count
refcount
);
assert!(
s.unset_sandbox_storage(storage_path).unwrap(),
s.remove_sandbox_storage(storage_path).await.unwrap(),
"Expects true as there is still a storage."
);
@ -661,7 +678,7 @@ mod tests {
// If no container is using the sandbox storage, the reference
// counter for it should not exist.
assert!(
s.unset_sandbox_storage(storage_path).is_err(),
s.remove_sandbox_storage(storage_path).await.is_err(),
"Expects false as the reference counter should no exist."
);
}

View File

@ -6,6 +6,7 @@
use anyhow::{anyhow, Context, Error, Result};
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::{collections::HashMap, fs, path::PathBuf};
/// Prefix to mark a volume as Kata special.
@ -427,6 +428,43 @@ impl TryFrom<&NydusExtraOptions> for KataVirtualVolume {
}
}
/// An implementation of generic storage device.
pub struct StorageDeviceGeneric {
refcount: u32,
}
impl std::fmt::Debug for StorageDeviceGeneric {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageState")
.field("refcount", &self.refcount)
.finish()
}
}
impl StorageDeviceGeneric {
/// Create a new instance of `StorageStateCommon`.
pub fn new() -> Self {
StorageDeviceGeneric { refcount: 0 }
}
/// Get reference count.
pub fn ref_count(&self) -> u32 {
self.refcount
}
/// Decrease reference count and return true if it reaches zero.
pub fn inc_ref_count(&mut self) {
self.refcount += 1;
}
/// Decrease reference count and return true if it reaches zero.
pub fn dec_and_test_ref_count(&mut self) -> bool {
assert!(self.refcount > 0);
self.refcount -= 1;
self.refcount == 0
}
}
/// Join user provided volume path with kata direct-volume root path.
///
/// The `volume_path` is base64-encoded and then safely joined to the `prefix`
@ -659,4 +697,18 @@ mod tests {
);
assert_eq!(volume.fs_type.as_str(), "rafsv6")
}
#[test]
fn test_storage_state_common() {
let mut state = StorageDeviceGeneric::new();
assert_eq!(state.ref_count(), 0);
state.inc_ref_count();
assert_eq!(state.ref_count(), 1);
state.inc_ref_count();
assert_eq!(state.ref_count(), 2);
assert!(!state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 1);
assert!(state.dec_and_test_ref_count());
assert_eq!(state.ref_count(), 0);
}
}