diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index a4d0c3342a..91395f1c26 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -1860,6 +1860,28 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.9.0", + "futures-core", + "inotify-sys", + "libc", + "tokio", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -3575,6 +3597,7 @@ dependencies = [ "cgroups-rs", "futures 0.3.28", "hypervisor", + "inotify", "kata-sys-util", "kata-types", "lazy_static", @@ -3599,6 +3622,7 @@ dependencies = [ "tokio", "tracing", "uuid 0.4.0", + "walkdir", ] [[package]] @@ -3806,6 +3830,15 @@ dependencies = [ "libc", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.22" @@ -5075,6 +5108,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -5220,6 +5263,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/src/runtime-rs/crates/resource/Cargo.toml b/src/runtime-rs/crates/resource/Cargo.toml index 54a8e014aa..0ec974d001 100644 --- a/src/runtime-rs/crates/resource/Cargo.toml +++ b/src/runtime-rs/crates/resource/Cargo.toml @@ -33,6 +33,8 @@ tokio = { workspace = true, features = ["process"] } tracing = { workspace = true } uuid = { version = "0.4", features = ["v4"] } oci-spec = { workspace = true } +inotify = "0.11.0" +walkdir = "2.5.0" ## Dependencies from `rust-netlink` netlink-packet-route = "0.22" diff --git a/src/runtime-rs/crates/resource/src/volume/share_fs_volume.rs b/src/runtime-rs/crates/resource/src/volume/share_fs_volume.rs index 5905f73f67..ba06ea118b 100644 --- a/src/runtime-rs/crates/resource/src/volume/share_fs_volume.rs +++ b/src/runtime-rs/crates/resource/src/volume/share_fs_volume.rs @@ -5,23 +5,30 @@ // use std::{ - collections::VecDeque, + collections::{HashSet, VecDeque}, fs::File, io::Read, os::unix::fs::MetadataExt, path::{Path, PathBuf}, str::FromStr, sync::Arc, + time::Duration, }; use agent::Agent; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use hypervisor::device::device_manager::DeviceManager; +use inotify::{EventMask, Inotify, WatchMask}; use kata_sys_util::mount::{get_mount_options, get_mount_path, get_mount_type}; use nix::sys::stat::SFlag; -use tokio::io::AsyncReadExt; -use tokio::sync::RwLock; +use tokio::{ + io::AsyncReadExt, + sync::{Mutex, RwLock}, + task::JoinHandle, + time::Instant, +}; +use walkdir::WalkDir; use super::Volume; use crate::share_fs::DEFAULT_KATA_GUEST_SANDBOX_DIR; @@ -31,6 +38,8 @@ use kata_types::mount; use oci_spec::runtime as oci; const SYS_MOUNT_PREFIX: [&str; 2] = ["/proc", "/sys"]; +const MONITOR_INTERVAL: Duration = Duration::from_millis(100); +const DEBOUNCE_TIME: Duration = Duration::from_millis(500); // copy file to container's rootfs if filesystem sharing is not supported, otherwise // bind mount it in the shared directory. @@ -42,6 +51,198 @@ pub(crate) struct ShareFsVolume { share_fs: Option>, mounts: Vec, storages: Vec, + monitor_task: Option>, +} + +/// Directory Monitor Config +/// path: the to be watched target directory +/// recursive: recursively monitor sub-dirs or not, +/// follow_symlinks: track symlinks or not, +/// exclude_hidden: exclude hidden files or not, +/// watch_events: Watcher Event types with CREATE/DELETE/MODIFY/MOVED_FROM/MOVED_TO +#[derive(Clone, Debug)] +struct MonitorConfig { + path: PathBuf, + recursive: bool, + follow_symlinks: bool, + exclude_hidden: bool, + watch_events: WatchMask, +} + +impl MonitorConfig { + fn new(path: &Path) -> Self { + Self { + path: path.to_path_buf(), + recursive: true, + follow_symlinks: false, + exclude_hidden: true, + watch_events: WatchMask::CREATE + | WatchMask::DELETE + | WatchMask::MODIFY + | WatchMask::MOVED_FROM + | WatchMask::MOVED_TO + | WatchMask::CLOSE_WRITE, + } + } +} + +#[derive(Clone)] +struct FsWatcher { + config: MonitorConfig, + inotify: Arc>, + watch_dirs: Arc>>, + pending_events: Arc>>, + need_sync: Arc>, +} + +impl FsWatcher { + async fn new(source_path: &Path) -> Result { + let inotify = Inotify::init()?; + let mon_cfg = MonitorConfig::new(source_path); + let mut watcher = Self { + config: mon_cfg, + inotify: Arc::new(Mutex::new(inotify)), + pending_events: Arc::new(Mutex::new(HashSet::new())), + watch_dirs: Arc::new(Mutex::new(HashSet::new())), + need_sync: Arc::new(Mutex::new(false)), + }; + + watcher.add_watchers().await?; + + Ok(watcher) + } + + /// add watched directory recursively + async fn add_watchers(&mut self) -> Result<()> { + let mut watched_dirs = self.watch_dirs.lock().await; + let config: &MonitorConfig = &self.config; + let walker = WalkDir::new(&config.path) + .follow_links(config.follow_symlinks) + .min_depth(0) + .max_depth(if config.recursive { usize::MAX } else { 1 }) + .into_iter() + .filter_entry(|e| { + !(config.exclude_hidden + && e.file_name() + .to_str() + .map(|s| s.starts_with('.')) + .unwrap_or(false)) + }); + + for entry in walker.filter_map(|e| e.ok()) { + if entry.file_type().is_dir() { + let path = entry.path(); + if watched_dirs.insert(path.to_path_buf()) { + self.inotify + .lock() + .await + .watches() + .add(path, config.watch_events)?; // we don't use WatchMask::ALL_EVENTS + } + } + } + + Ok(()) + } + + /// start monitor + pub async fn start_monitor( + &self, + agent: Arc, + src: PathBuf, + dst: PathBuf, + ) -> JoinHandle<()> { + let need_sync = self.need_sync.clone(); + let pending_events = self.pending_events.clone(); + let inotify = self.inotify.clone(); + let monitor_config = self.config.clone(); + + tokio::spawn(async move { + let mut buffer = [0u8; 4096]; + let mut last_event_time = None; + + loop { + // use cloned inotify instance + match inotify.lock().await.read_events(&mut buffer) { + Ok(events) => { + for event in events { + if !event.mask.intersects( + EventMask::CREATE + | EventMask::MODIFY + | EventMask::DELETE + | EventMask::MOVED_FROM + | EventMask::MOVED_TO, + ) { + continue; + } + + if let Some(file_name) = event.name { + let full_path = &monitor_config.path.join(file_name); + let event_types: Vec<&str> = event + .mask + .iter() + .map(|m| match m { + EventMask::CREATE => "CREATE", + EventMask::DELETE => "DELETE", + EventMask::MODIFY => "MODIFY", + EventMask::MOVED_FROM => "MOVED_FROM", + EventMask::MOVED_TO => "MOVED_TO", + EventMask::CLOSE_WRITE => "CLOSE_WRITE", + _ => "OTHER", + }) + .collect(); + + info!( + sl!(), + "handle events [{}] {:?} -> {:?}", + event_types.join("|"), + event.mask, + full_path + ); + pending_events.lock().await.insert(full_path.clone()); + } + } + } + Err(e) => eprintln!("inotify error: {}", e), + } + + // handle events to be synchronized + let events_paths = { + let mut pending = pending_events.lock().await; + pending.drain().collect::>() + }; + if !events_paths.is_empty() { + *need_sync.lock().await = true; + last_event_time = Some(Instant::now()); + } + + // Debounce handling + // It is used to prevent unnecessary repeated copies when file changes are triggered + // multiple times in a short period; we only execute the last one. + if let Some(t) = last_event_time { + if Instant::now().duration_since(t) > DEBOUNCE_TIME && *need_sync.lock().await { + info!(sl!(), "debounce handle copyfile {:?} -> {:?}", &src, &dst); + if let Err(e) = + copy_dir_recursively(&src, &dst.display().to_string(), &agent).await + { + error!( + sl!(), + "debounce handle copyfile {:?} -> {:?} failed with error: {:?}", + &src, + &dst, + e + ); + eprintln!("sync host/guest files failed: {}", e); + } + *need_sync.lock().await = false; + last_event_time = None; + } + } + + tokio::time::sleep(MONITOR_INTERVAL).await; + } + }) + } } impl ShareFsVolume { @@ -65,6 +266,7 @@ impl ShareFsVolume { share_fs: share_fs.as_ref().map(Arc::clone), mounts: vec![], storages: vec![], + monitor_task: None, }; match share_fs { None => { @@ -200,6 +402,13 @@ impl ShareFsVolume { oci_mount.set_source(Some(PathBuf::from(&dest_dir))); oci_mount.set_options(Some(options)); volume.mounts.push(oci_mount); + + // start monitoring + let watcher = FsWatcher::new(Path::new(&source_path)).await?; + let monitor_task = watcher + .start_monitor(agent.clone(), src.clone(), dest_dir.into()) + .await; + volume.monitor_task = Some(monitor_task); } else { // If not, we can ignore it. Let's issue a warning so that the user knows. warn!(