runtime-rs: Impl recursive directory copy with metadata preservation

Add async directory traversal using BFS algorithm:
(1) Support file type handling:
Regular files (S_IFREG) with content streaming;
Directories (S_IFDIR) with mode preservation;
Symbolic links (S_IFLNK) with target recreation;
(2) Maintain POSIX metadata:
UID/GID preservation,File mode bits, and Directory permissions
(3) Implement async I/O operations for:
Directory enumeration, file reading, symlink target resolution

Fixes #11237

Signed-off-by: alex.lyn <alex.lyn@antgroup.com>
This commit is contained in:
alex.lyn 2025-05-07 16:10:00 +08:00
parent 9a03815f18
commit 8da7cd1611

View File

@ -5,6 +5,7 @@
//
use std::{
collections::VecDeque,
fs::File,
io::Read,
os::unix::fs::MetadataExt,
@ -18,6 +19,8 @@ use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use hypervisor::device::device_manager::DeviceManager;
use kata_sys_util::mount::{get_mount_options, get_mount_path, get_mount_type};
use nix::sys::stat::SFlag;
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use super::Volume;
@ -307,6 +310,130 @@ impl Volume for ShareFsVolume {
}
}
#[allow(dead_code)]
async fn copy_dir_recursively<P: AsRef<Path>>(
src_dir: P,
dest_dir: &str,
agent: &Arc<dyn Agent>,
) -> Result<()> {
let mut queue = VecDeque::new();
queue.push_back((src_dir.as_ref().to_path_buf(), dest_dir.to_string()));
while let Some((current_src, current_dest)) = queue.pop_front() {
let mut entries = tokio::fs::read_dir(&current_src)
.await
.context(format!("read directory: {:?}", current_src))?;
while let Some(entry) = entries
.next_entry()
.await
.context(format!("read directory entry in {:?}", current_src))?
{
let entry_path = entry.path();
let file_name = entry_path
.file_name()
.ok_or_else(|| anyhow!("get file name for {:?}", entry_path))?
.to_string_lossy()
.to_string();
let dest_path = format!("{}/{}", current_dest, file_name);
let metadata = entry
.metadata()
.await
.context(format!("read metadata for {:?}", entry_path))?;
if metadata.is_symlink() {
// handle symlinks
let entry_path_err = entry_path.clone();
let entry_path_clone = entry_path.clone();
let link_target =
tokio::task::spawn_blocking(move || std::fs::read_link(&entry_path_clone))
.await
.context(format!(
"failed to spawn blocking task for symlink: {:?}",
entry_path_err
))??;
let link_target_str = link_target.to_string_lossy().into_owned();
let symlink_request = agent::CopyFileRequest {
path: dest_path.clone(),
file_size: link_target_str.len() as i64,
uid: metadata.uid() as i32,
gid: metadata.gid() as i32,
file_mode: SFlag::S_IFLNK.bits(),
data: link_target_str.clone().into_bytes(),
..Default::default()
};
info!(
sl!(),
"copying symlink_request {:?} in sandbox with file_mode: {:?}",
dest_path.clone(),
symlink_request.file_mode
);
agent.copy_file(symlink_request).await.context(format!(
"failed to create symlink: {:?} -> {:?}",
dest_path, link_target_str
))?;
} else if metadata.is_dir() {
// handle directory
let dir_request = agent::CopyFileRequest {
path: dest_path.clone(),
file_size: 0,
uid: metadata.uid() as i32,
gid: metadata.gid() as i32,
dir_mode: metadata.mode(),
file_mode: SFlag::S_IFDIR.bits(),
data: vec![],
..Default::default()
};
info!(
sl!(),
"copying subdirectory {:?} in sandbox with file_mode: {:?}",
dir_request.path,
dir_request.file_mode
);
agent
.copy_file(dir_request)
.await
.context(format!("Failed to create subdirectory: {:?}", dest_path))?;
// push back the sub-dir into queue to handle it in time
queue.push_back((entry_path, dest_path));
} else if metadata.is_file() {
// async read file
let mut file = tokio::fs::File::open(&entry_path)
.await
.context(format!("open file: {:?}", entry_path))?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)
.await
.context(format!("read file: {:?}", entry_path))?;
let file_request = agent::CopyFileRequest {
path: dest_path.clone(),
file_size: metadata.len() as i64,
uid: metadata.uid() as i32,
gid: metadata.gid() as i32,
file_mode: SFlag::S_IFREG.bits(),
data: buffer,
..Default::default()
};
info!(sl!(), "copy file {:?} to guest", dest_path.clone());
agent
.copy_file(file_request)
.await
.context(format!("copy file: {:?} -> {:?}", entry_path, dest_path))?;
}
}
}
Ok(())
}
pub(crate) fn is_share_fs_volume(m: &oci::Mount) -> bool {
let mount_type = get_mount_type(m);
(mount_type == "bind" || mount_type == mount::KATA_EPHEMERAL_VOLUME_TYPE)