agent: Refactor multi-layer EROFS handling with unified flow

Refactor the multi-layer EROFS storage handling to improve code
maintainability and reduce duplication.

Key changes:
(1) Extract update_storage_device() to unify device state management
  for both multi-layer and standard storages
(2) Simplify handle_multi_layer_storage() to focus on device creation,
  returning MultiLayerProcessResult struct instead of managing state
(3) Unify the processing flow in add_storages() with clear separation:
(4) Support multiple EROFS lower layers with dynamic lower-N mount paths
(5) Improve mkdir directive handling with deferred {{ mount 1 }}
  resolution

This reduces code duplication, improves readability, and makes the
storage handling logic more consistent across different storage types.

Signed-off-by: Alex Lyn <alex.lyn@antgroup.com>
This commit is contained in:
Alex Lyn
2026-04-01 11:34:10 +08:00
parent 47a501be80
commit caf0275645

View File

@@ -159,87 +159,86 @@ lazy_static! {
};
}
/// Result of multi-layer storage handling
struct MultiLayerProcessResult {
/// The primary device created
device: Arc<dyn StorageDevice>,
/// All mount points that were processed as part of this group
processed_mount_points: Vec<String>,
}
/// Handle multi-layer storage by creating the overlay device.
/// Returns None if the storage is not a multi-layer storage.
/// Returns Some(Ok(result)) if successfully processed.
/// Returns Some(Err(e)) if there was an error.
async fn handle_multi_layer_storage(
logger: &Logger,
storage: &Storage,
storages: &[Storage],
sandbox: &Arc<Mutex<Sandbox>>,
cid: &Option<String>,
processed_multi_layer_mount_points: &mut HashSet<String>,
mount_list: &mut Vec<String>,
) -> Result<bool> {
processed_mount_points: &HashSet<String>,
) -> Result<Option<MultiLayerProcessResult>> {
if !is_multi_layer_storage(storage) {
return Ok(false);
return Ok(None);
}
if processed_multi_layer_mount_points.contains(&storage.mount_point) {
return Ok(true);
// Skip if already processed as part of a previous multi-layer group
if processed_mount_points.contains(&storage.mount_point) {
return Ok(None);
}
slog::info!(
logger,
"processing multi-layer EROFS storage from unified loop";
"processing multi-layer EROFS storage";
"mount-point" => &storage.mount_point,
"source" => &storage.source,
"driver" => &storage.driver,
"fstype" => &storage.fstype,
);
let result = match handle_multi_layer_erofs_group(storage, storages, cid, sandbox, logger).await
let result = handle_multi_layer_erofs_group(storage, storages, cid, sandbox, logger).await?;
// Create device for the mount point
let device = new_device(result.mount_point.clone())?;
Ok(Some(MultiLayerProcessResult {
device,
processed_mount_points: result.processed_mount_points,
}))
}
/// Update sandbox storage with the created device.
/// Handles cleanup on failure.
async fn update_storage_device(
sandbox: &Arc<Mutex<Sandbox>>,
mount_point: &str,
device: Arc<dyn StorageDevice>,
logger: &Logger,
) -> Result<()> {
if let Err(device) = sandbox
.lock()
.await
.update_sandbox_storage(mount_point, device)
{
Ok(r) => r,
Err(e) => {
error!(logger, "failed to handle multi-layer EROFS: {:?}", e);
return Err(e);
}
};
for processed_mount_point in &result.processed_mount_points {
processed_multi_layer_mount_points.insert(processed_mount_point.clone());
}
for grouped_storage in storages {
if !result
.processed_mount_points
.iter()
.any(|mp| mp == &grouped_storage.mount_point)
{
continue;
}
let path = grouped_storage.mount_point.clone();
let state = sandbox
error!(logger, "failed to update device for storage"; "mount-point" => mount_point);
if let Err(e) = sandbox
.lock()
.await
.add_sandbox_storage(&path, grouped_storage.shared)
.await;
if state.ref_count().await > 1 {
continue;
}
let device = new_device(result.mount_point.clone())?;
if let Err(device) = sandbox
.lock()
.remove_sandbox_storage(mount_point)
.await
.update_sandbox_storage(&path, device.clone())
{
error!(logger, "failed to update device for multi-layer storage");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await {
warn!(logger, "failed to remove dummy sandbox storage {:?}", e);
}
if let Err(e) = device.cleanup() {
error!(
logger,
"failed to clean state for multi-layer storage device {}, {}", path, e
);
}
return Err(anyhow!("failed to update device for multi-layer storage"));
warn!(logger, "failed to remove dummy sandbox storage"; "error" => ?e);
}
if let Err(e) = device.cleanup() {
error!(logger, "failed to clean state for storage device"; "mount-point" => mount_point, "error" => ?e);
}
return Err(anyhow!(
"failed to update device for storage: {}",
mount_point
));
}
mount_list.push(result.mount_point);
Ok(true)
Ok(())
}
// add_storages takes a list of storages passed by the caller, and perform the
@@ -254,22 +253,54 @@ pub async fn add_storages(
cid: Option<String>,
) -> Result<Vec<String>> {
let mut mount_list = Vec::new();
let mut processed_multi_layer_mount_points = HashSet::new();
let mut processed_mount_points = HashSet::new();
for storage in &storages {
if handle_multi_layer_storage(
// Try multi-layer storage handling first
if let Some(result) = handle_multi_layer_storage(
&logger,
storage,
&storages,
sandbox,
&cid,
&mut processed_multi_layer_mount_points,
&mut mount_list,
&processed_mount_points,
)
.await?
{
// Register all processed mount points
for mp in &result.processed_mount_points {
processed_mount_points.insert(mp.clone());
}
// Add sandbox storage for each mount point in the group
for mp in &result.processed_mount_points {
let state = sandbox
.lock()
.await
.add_sandbox_storage(mp, storage.shared)
.await;
// Only update device for the first occurrence
if state.ref_count().await == 1 {
update_storage_device(sandbox, mp, result.device.clone(), &logger).await?;
}
}
// Add the primary mount point to the list
if let Some(path) = result.device.path() {
if !path.is_empty() {
mount_list.push(path.to_string());
}
}
continue;
}
// Skip if already processed as part of multi-layer group
if processed_mount_points.contains(&storage.mount_point) {
continue;
}
// Standard storage handling
let path = storage.mount_point.clone();
let state = sandbox
.lock()
@@ -277,68 +308,48 @@ pub async fn add_storages(
.add_sandbox_storage(&path, storage.shared)
.await;
if state.ref_count().await > 1 {
if let Some(path) = state.path() {
if !path.is_empty() {
mount_list.push(path.to_string());
if let Some(p) = state.path() {
if !p.is_empty() {
mount_list.push(p.to_string());
}
}
// The device already exists.
continue;
}
if let Some(handler) = STORAGE_HANDLERS.handler(&storage.driver) {
// Create device using handler
let device = if let Some(handler) = STORAGE_HANDLERS.handler(&storage.driver) {
let logger =
logger.new(o!( "subsystem" => "storage", "storage-type" => storage.driver.clone()));
logger.new(o!("subsystem" => "storage", "storage-type" => storage.driver.clone()));
let mut ctx = StorageContext {
cid: &cid,
logger: &logger,
sandbox,
};
match handler.create_device(storage.clone(), &mut ctx).await {
Ok(device) => {
match sandbox
.lock()
.await
.update_sandbox_storage(&path, device.clone())
{
Ok(d) => {
if let Some(path) = device.path() {
if !path.is_empty() {
mount_list.push(path.to_string());
}
}
drop(d);
}
Err(device) => {
error!(logger, "failed to update device for storage");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await
{
warn!(logger, "failed to remove dummy sandbox storage {:?}", e);
}
if let Err(e) = device.cleanup() {
error!(
logger,
"failed to clean state for storage device {}, {}", path, e
);
}
return Err(anyhow!("failed to update device for storage"));
}
}
}
Err(e) => {
error!(logger, "failed to create device for storage, error: {e:?}");
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await {
warn!(logger, "failed to remove dummy sandbox storage {e:?}");
}
return Err(e);
}
}
handler.create_device(storage.clone(), &mut ctx).await
} else {
return Err(anyhow!(
"Failed to find the storage handler {}",
storage.driver
));
};
match device {
Ok(device) => {
update_storage_device(sandbox, &path, device.clone(), &logger).await?;
if let Some(p) = device.path() {
if !p.is_empty() {
mount_list.push(p.to_string());
}
}
}
Err(e) => {
error!(logger, "failed to create device for storage"; "error" => ?e);
if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await {
warn!(logger, "failed to remove dummy sandbox storage"; "error" => ?e);
}
return Err(e);
}
}
}