mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-31 08:28:34 +00:00
runtime-rs: Add inotify-based real-time directory synchronization
Introduce event-driven file sync mechanism between host and guest when sharedfs is disabled, which will help monitor the host path in time and do sync files changes: 1. Introduce FsWatcher to monitor directory changes via inotify; 2. Support recursive watching with configurable filters; 3. Add debounce logic (default 500ms cooldown) to handle burst events; 4. Trigger `copy_dir_recursively` on stable state; 5. Handle CREATE/MODIFY/DELETE/MOVED/CLOSE_WRITE events; Fixes #11237 Signed-off-by: alex.lyn <alex.lyn@antgroup.com>
This commit is contained in:
52
src/runtime-rs/Cargo.lock
generated
52
src/runtime-rs/Cargo.lock
generated
@@ -1860,6 +1860,28 @@ dependencies = [
|
||||
"hashbrown 0.15.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3"
|
||||
dependencies = [
|
||||
"bitflags 2.9.0",
|
||||
"futures-core",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify-sys"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.12"
|
||||
@@ -3575,6 +3597,7 @@ dependencies = [
|
||||
"cgroups-rs",
|
||||
"futures 0.3.28",
|
||||
"hypervisor",
|
||||
"inotify",
|
||||
"kata-sys-util",
|
||||
"kata-types",
|
||||
"lazy_static",
|
||||
@@ -3599,6 +3622,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid 0.4.0",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3806,6 +3830,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "same-file"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
|
||||
dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.22"
|
||||
@@ -5075,6 +5108,16 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
|
||||
|
||||
[[package]]
|
||||
name = "walkdir"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
|
||||
dependencies = [
|
||||
"same-file",
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "want"
|
||||
version = "0.3.1"
|
||||
@@ -5220,6 +5263,15 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
|
@@ -33,6 +33,8 @@ tokio = { workspace = true, features = ["process"] }
|
||||
tracing = { workspace = true }
|
||||
uuid = { version = "0.4", features = ["v4"] }
|
||||
oci-spec = { workspace = true }
|
||||
inotify = "0.11.0"
|
||||
walkdir = "2.5.0"
|
||||
|
||||
## Dependencies from `rust-netlink`
|
||||
netlink-packet-route = "0.22"
|
||||
|
@@ -5,23 +5,30 @@
|
||||
//
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
collections::{HashSet, VecDeque},
|
||||
fs::File,
|
||||
io::Read,
|
||||
os::unix::fs::MetadataExt,
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use hypervisor::device::device_manager::DeviceManager;
|
||||
use inotify::{EventMask, Inotify, WatchMask};
|
||||
use kata_sys_util::mount::{get_mount_options, get_mount_path, get_mount_type};
|
||||
use nix::sys::stat::SFlag;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::{
|
||||
io::AsyncReadExt,
|
||||
sync::{Mutex, RwLock},
|
||||
task::JoinHandle,
|
||||
time::Instant,
|
||||
};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use super::Volume;
|
||||
use crate::share_fs::DEFAULT_KATA_GUEST_SANDBOX_DIR;
|
||||
@@ -31,6 +38,8 @@ use kata_types::mount;
|
||||
use oci_spec::runtime as oci;
|
||||
|
||||
const SYS_MOUNT_PREFIX: [&str; 2] = ["/proc", "/sys"];
|
||||
const MONITOR_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const DEBOUNCE_TIME: Duration = Duration::from_millis(500);
|
||||
|
||||
// copy file to container's rootfs if filesystem sharing is not supported, otherwise
|
||||
// bind mount it in the shared directory.
|
||||
@@ -42,6 +51,198 @@ pub(crate) struct ShareFsVolume {
|
||||
share_fs: Option<Arc<dyn ShareFs>>,
|
||||
mounts: Vec<oci::Mount>,
|
||||
storages: Vec<agent::Storage>,
|
||||
monitor_task: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
/// Directory Monitor Config
|
||||
/// path: the to be watched target directory
|
||||
/// recursive: recursively monitor sub-dirs or not,
|
||||
/// follow_symlinks: track symlinks or not,
|
||||
/// exclude_hidden: exclude hidden files or not,
|
||||
/// watch_events: Watcher Event types with CREATE/DELETE/MODIFY/MOVED_FROM/MOVED_TO
|
||||
#[derive(Clone, Debug)]
|
||||
struct MonitorConfig {
|
||||
path: PathBuf,
|
||||
recursive: bool,
|
||||
follow_symlinks: bool,
|
||||
exclude_hidden: bool,
|
||||
watch_events: WatchMask,
|
||||
}
|
||||
|
||||
impl MonitorConfig {
|
||||
fn new(path: &Path) -> Self {
|
||||
Self {
|
||||
path: path.to_path_buf(),
|
||||
recursive: true,
|
||||
follow_symlinks: false,
|
||||
exclude_hidden: true,
|
||||
watch_events: WatchMask::CREATE
|
||||
| WatchMask::DELETE
|
||||
| WatchMask::MODIFY
|
||||
| WatchMask::MOVED_FROM
|
||||
| WatchMask::MOVED_TO
|
||||
| WatchMask::CLOSE_WRITE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FsWatcher {
|
||||
config: MonitorConfig,
|
||||
inotify: Arc<Mutex<Inotify>>,
|
||||
watch_dirs: Arc<Mutex<HashSet<PathBuf>>>,
|
||||
pending_events: Arc<Mutex<HashSet<PathBuf>>>,
|
||||
need_sync: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl FsWatcher {
|
||||
async fn new(source_path: &Path) -> Result<Self> {
|
||||
let inotify = Inotify::init()?;
|
||||
let mon_cfg = MonitorConfig::new(source_path);
|
||||
let mut watcher = Self {
|
||||
config: mon_cfg,
|
||||
inotify: Arc::new(Mutex::new(inotify)),
|
||||
pending_events: Arc::new(Mutex::new(HashSet::new())),
|
||||
watch_dirs: Arc::new(Mutex::new(HashSet::new())),
|
||||
need_sync: Arc::new(Mutex::new(false)),
|
||||
};
|
||||
|
||||
watcher.add_watchers().await?;
|
||||
|
||||
Ok(watcher)
|
||||
}
|
||||
|
||||
/// add watched directory recursively
|
||||
async fn add_watchers(&mut self) -> Result<()> {
|
||||
let mut watched_dirs = self.watch_dirs.lock().await;
|
||||
let config: &MonitorConfig = &self.config;
|
||||
let walker = WalkDir::new(&config.path)
|
||||
.follow_links(config.follow_symlinks)
|
||||
.min_depth(0)
|
||||
.max_depth(if config.recursive { usize::MAX } else { 1 })
|
||||
.into_iter()
|
||||
.filter_entry(|e| {
|
||||
!(config.exclude_hidden
|
||||
&& e.file_name()
|
||||
.to_str()
|
||||
.map(|s| s.starts_with('.'))
|
||||
.unwrap_or(false))
|
||||
});
|
||||
|
||||
for entry in walker.filter_map(|e| e.ok()) {
|
||||
if entry.file_type().is_dir() {
|
||||
let path = entry.path();
|
||||
if watched_dirs.insert(path.to_path_buf()) {
|
||||
self.inotify
|
||||
.lock()
|
||||
.await
|
||||
.watches()
|
||||
.add(path, config.watch_events)?; // we don't use WatchMask::ALL_EVENTS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// start monitor
|
||||
pub async fn start_monitor(
|
||||
&self,
|
||||
agent: Arc<dyn Agent>,
|
||||
src: PathBuf,
|
||||
dst: PathBuf,
|
||||
) -> JoinHandle<()> {
|
||||
let need_sync = self.need_sync.clone();
|
||||
let pending_events = self.pending_events.clone();
|
||||
let inotify = self.inotify.clone();
|
||||
let monitor_config = self.config.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut buffer = [0u8; 4096];
|
||||
let mut last_event_time = None;
|
||||
|
||||
loop {
|
||||
// use cloned inotify instance
|
||||
match inotify.lock().await.read_events(&mut buffer) {
|
||||
Ok(events) => {
|
||||
for event in events {
|
||||
if !event.mask.intersects(
|
||||
EventMask::CREATE
|
||||
| EventMask::MODIFY
|
||||
| EventMask::DELETE
|
||||
| EventMask::MOVED_FROM
|
||||
| EventMask::MOVED_TO,
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(file_name) = event.name {
|
||||
let full_path = &monitor_config.path.join(file_name);
|
||||
let event_types: Vec<&str> = event
|
||||
.mask
|
||||
.iter()
|
||||
.map(|m| match m {
|
||||
EventMask::CREATE => "CREATE",
|
||||
EventMask::DELETE => "DELETE",
|
||||
EventMask::MODIFY => "MODIFY",
|
||||
EventMask::MOVED_FROM => "MOVED_FROM",
|
||||
EventMask::MOVED_TO => "MOVED_TO",
|
||||
EventMask::CLOSE_WRITE => "CLOSE_WRITE",
|
||||
_ => "OTHER",
|
||||
})
|
||||
.collect();
|
||||
|
||||
info!(
|
||||
sl!(),
|
||||
"handle events [{}] {:?} -> {:?}",
|
||||
event_types.join("|"),
|
||||
event.mask,
|
||||
full_path
|
||||
);
|
||||
pending_events.lock().await.insert(full_path.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!("inotify error: {}", e),
|
||||
}
|
||||
|
||||
// handle events to be synchronized
|
||||
let events_paths = {
|
||||
let mut pending = pending_events.lock().await;
|
||||
pending.drain().collect::<Vec<_>>()
|
||||
};
|
||||
if !events_paths.is_empty() {
|
||||
*need_sync.lock().await = true;
|
||||
last_event_time = Some(Instant::now());
|
||||
}
|
||||
|
||||
// Debounce handling
|
||||
// It is used to prevent unnecessary repeated copies when file changes are triggered
|
||||
// multiple times in a short period; we only execute the last one.
|
||||
if let Some(t) = last_event_time {
|
||||
if Instant::now().duration_since(t) > DEBOUNCE_TIME && *need_sync.lock().await {
|
||||
info!(sl!(), "debounce handle copyfile {:?} -> {:?}", &src, &dst);
|
||||
if let Err(e) =
|
||||
copy_dir_recursively(&src, &dst.display().to_string(), &agent).await
|
||||
{
|
||||
error!(
|
||||
sl!(),
|
||||
"debounce handle copyfile {:?} -> {:?} failed with error: {:?}",
|
||||
&src,
|
||||
&dst,
|
||||
e
|
||||
);
|
||||
eprintln!("sync host/guest files failed: {}", e);
|
||||
}
|
||||
*need_sync.lock().await = false;
|
||||
last_event_time = None;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(MONITOR_INTERVAL).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ShareFsVolume {
|
||||
@@ -65,6 +266,7 @@ impl ShareFsVolume {
|
||||
share_fs: share_fs.as_ref().map(Arc::clone),
|
||||
mounts: vec![],
|
||||
storages: vec![],
|
||||
monitor_task: None,
|
||||
};
|
||||
match share_fs {
|
||||
None => {
|
||||
@@ -200,6 +402,13 @@ impl ShareFsVolume {
|
||||
oci_mount.set_source(Some(PathBuf::from(&dest_dir)));
|
||||
oci_mount.set_options(Some(options));
|
||||
volume.mounts.push(oci_mount);
|
||||
|
||||
// start monitoring
|
||||
let watcher = FsWatcher::new(Path::new(&source_path)).await?;
|
||||
let monitor_task = watcher
|
||||
.start_monitor(agent.clone(), src.clone(), dest_dir.into())
|
||||
.await;
|
||||
volume.monitor_task = Some(monitor_task);
|
||||
} else {
|
||||
// If not, we can ignore it. Let's issue a warning so that the user knows.
|
||||
warn!(
|
||||
|
Reference in New Issue
Block a user