diff --git a/src/agent/src/storage/mod.rs b/src/agent/src/storage/mod.rs index ef80193b51..bb6695401a 100644 --- a/src/agent/src/storage/mod.rs +++ b/src/agent/src/storage/mod.rs @@ -159,87 +159,86 @@ lazy_static! { }; } +/// Result of multi-layer storage handling +struct MultiLayerProcessResult { + /// The primary device created + device: Arc, + /// All mount points that were processed as part of this group + processed_mount_points: Vec, +} + +/// Handle multi-layer storage by creating the overlay device. +/// Returns None if the storage is not a multi-layer storage. +/// Returns Some(Ok(result)) if successfully processed. +/// Returns Some(Err(e)) if there was an error. async fn handle_multi_layer_storage( logger: &Logger, storage: &Storage, storages: &[Storage], sandbox: &Arc>, cid: &Option, - processed_multi_layer_mount_points: &mut HashSet, - mount_list: &mut Vec, -) -> Result { + processed_mount_points: &HashSet, +) -> Result> { if !is_multi_layer_storage(storage) { - return Ok(false); + return Ok(None); } - if processed_multi_layer_mount_points.contains(&storage.mount_point) { - return Ok(true); + // Skip if already processed as part of a previous multi-layer group + if processed_mount_points.contains(&storage.mount_point) { + return Ok(None); } slog::info!( logger, - "processing multi-layer EROFS storage from unified loop"; + "processing multi-layer EROFS storage"; "mount-point" => &storage.mount_point, "source" => &storage.source, "driver" => &storage.driver, "fstype" => &storage.fstype, ); - let result = match handle_multi_layer_erofs_group(storage, storages, cid, sandbox, logger).await + let result = handle_multi_layer_erofs_group(storage, storages, cid, sandbox, logger).await?; + + // Create device for the mount point + let device = new_device(result.mount_point.clone())?; + + Ok(Some(MultiLayerProcessResult { + device, + processed_mount_points: result.processed_mount_points, + })) +} + +/// Update sandbox storage with the created device. +/// Handles cleanup on failure. +async fn update_storage_device( + sandbox: &Arc>, + mount_point: &str, + device: Arc, + logger: &Logger, +) -> Result<()> { + if let Err(device) = sandbox + .lock() + .await + .update_sandbox_storage(mount_point, device) { - Ok(r) => r, - Err(e) => { - error!(logger, "failed to handle multi-layer EROFS: {:?}", e); - return Err(e); - } - }; - - for processed_mount_point in &result.processed_mount_points { - processed_multi_layer_mount_points.insert(processed_mount_point.clone()); - } - - for grouped_storage in storages { - if !result - .processed_mount_points - .iter() - .any(|mp| mp == &grouped_storage.mount_point) - { - continue; - } - - let path = grouped_storage.mount_point.clone(); - let state = sandbox + error!(logger, "failed to update device for storage"; "mount-point" => mount_point); + if let Err(e) = sandbox .lock() .await - .add_sandbox_storage(&path, grouped_storage.shared) - .await; - - if state.ref_count().await > 1 { - continue; - } - - let device = new_device(result.mount_point.clone())?; - if let Err(device) = sandbox - .lock() + .remove_sandbox_storage(mount_point) .await - .update_sandbox_storage(&path, device.clone()) { - error!(logger, "failed to update device for multi-layer storage"); - if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await { - warn!(logger, "failed to remove dummy sandbox storage {:?}", e); - } - if let Err(e) = device.cleanup() { - error!( - logger, - "failed to clean state for multi-layer storage device {}, {}", path, e - ); - } - return Err(anyhow!("failed to update device for multi-layer storage")); + warn!(logger, "failed to remove dummy sandbox storage"; "error" => ?e); } + if let Err(e) = device.cleanup() { + error!(logger, "failed to clean state for storage device"; "mount-point" => mount_point, "error" => ?e); + } + return Err(anyhow!( + "failed to update device for storage: {}", + mount_point + )); } - - mount_list.push(result.mount_point); - Ok(true) + Ok(()) } // add_storages takes a list of storages passed by the caller, and perform the @@ -254,22 +253,54 @@ pub async fn add_storages( cid: Option, ) -> Result> { let mut mount_list = Vec::new(); - let mut processed_multi_layer_mount_points = HashSet::new(); + let mut processed_mount_points = HashSet::new(); for storage in &storages { - if handle_multi_layer_storage( + // Try multi-layer storage handling first + if let Some(result) = handle_multi_layer_storage( &logger, storage, &storages, sandbox, &cid, - &mut processed_multi_layer_mount_points, - &mut mount_list, + &processed_mount_points, ) .await? { + // Register all processed mount points + for mp in &result.processed_mount_points { + processed_mount_points.insert(mp.clone()); + } + + // Add sandbox storage for each mount point in the group + for mp in &result.processed_mount_points { + let state = sandbox + .lock() + .await + .add_sandbox_storage(mp, storage.shared) + .await; + + // Only update device for the first occurrence + if state.ref_count().await == 1 { + update_storage_device(sandbox, mp, result.device.clone(), &logger).await?; + } + } + + // Add the primary mount point to the list + if let Some(path) = result.device.path() { + if !path.is_empty() { + mount_list.push(path.to_string()); + } + } continue; } + + // Skip if already processed as part of multi-layer group + if processed_mount_points.contains(&storage.mount_point) { + continue; + } + + // Standard storage handling let path = storage.mount_point.clone(); let state = sandbox .lock() @@ -277,68 +308,48 @@ pub async fn add_storages( .add_sandbox_storage(&path, storage.shared) .await; if state.ref_count().await > 1 { - if let Some(path) = state.path() { - if !path.is_empty() { - mount_list.push(path.to_string()); + if let Some(p) = state.path() { + if !p.is_empty() { + mount_list.push(p.to_string()); } } // The device already exists. continue; } - if let Some(handler) = STORAGE_HANDLERS.handler(&storage.driver) { + // Create device using handler + let device = if let Some(handler) = STORAGE_HANDLERS.handler(&storage.driver) { let logger = - logger.new(o!( "subsystem" => "storage", "storage-type" => storage.driver.clone())); + logger.new(o!("subsystem" => "storage", "storage-type" => storage.driver.clone())); let mut ctx = StorageContext { cid: &cid, logger: &logger, sandbox, }; - - match handler.create_device(storage.clone(), &mut ctx).await { - Ok(device) => { - match sandbox - .lock() - .await - .update_sandbox_storage(&path, device.clone()) - { - Ok(d) => { - if let Some(path) = device.path() { - if !path.is_empty() { - mount_list.push(path.to_string()); - } - } - drop(d); - } - Err(device) => { - error!(logger, "failed to update device for storage"); - if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await - { - warn!(logger, "failed to remove dummy sandbox storage {:?}", e); - } - if let Err(e) = device.cleanup() { - error!( - logger, - "failed to clean state for storage device {}, {}", path, e - ); - } - return Err(anyhow!("failed to update device for storage")); - } - } - } - Err(e) => { - error!(logger, "failed to create device for storage, error: {e:?}"); - if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await { - warn!(logger, "failed to remove dummy sandbox storage {e:?}"); - } - return Err(e); - } - } + handler.create_device(storage.clone(), &mut ctx).await } else { return Err(anyhow!( "Failed to find the storage handler {}", storage.driver )); + }; + + match device { + Ok(device) => { + update_storage_device(sandbox, &path, device.clone(), &logger).await?; + if let Some(p) = device.path() { + if !p.is_empty() { + mount_list.push(p.to_string()); + } + } + } + Err(e) => { + error!(logger, "failed to create device for storage"; "error" => ?e); + if let Err(e) = sandbox.lock().await.remove_sandbox_storage(&path).await { + warn!(logger, "failed to remove dummy sandbox storage"; "error" => ?e); + } + return Err(e); + } } }