mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-09-03 18:04:16 +00:00
Merge pull request #11240 from Apokleos/copydir
runtime-rs: Propagate k8s configs correctly when sharedfs is disabled
This commit is contained in:
@@ -2013,22 +2013,35 @@ fn do_copy_file(req: &CopyFileRequest) -> Result<()> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create parent directories if missing
|
||||||
if let Some(parent) = path.parent() {
|
if let Some(parent) = path.parent() {
|
||||||
if !parent.exists() {
|
if !parent.exists() {
|
||||||
let dir = parent.to_path_buf();
|
let dir = parent.to_path_buf();
|
||||||
|
// Attempt to create directory, ignore AlreadyExists errors
|
||||||
if let Err(e) = fs::create_dir_all(&dir) {
|
if let Err(e) = fs::create_dir_all(&dir) {
|
||||||
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
||||||
return Err(e.into());
|
return Err(e.into());
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(req.dir_mode))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set directory permissions and ownership
|
||||||
|
std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(req.dir_mode))?;
|
||||||
|
unistd::chown(
|
||||||
|
&dir,
|
||||||
|
Some(Uid::from_raw(req.uid as u32)),
|
||||||
|
Some(Gid::from_raw(req.gid as u32)),
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let sflag = stat::SFlag::from_bits_truncate(req.file_mode);
|
let sflag = stat::SFlag::from_bits_truncate(req.file_mode);
|
||||||
|
|
||||||
if sflag.contains(stat::SFlag::S_IFDIR) {
|
if sflag.contains(stat::SFlag::S_IFDIR) {
|
||||||
|
// Remove existing non-directory file if present
|
||||||
|
if path.exists() && !path.is_dir() {
|
||||||
|
fs::remove_file(&path)?;
|
||||||
|
}
|
||||||
|
|
||||||
fs::create_dir(&path).or_else(|e| {
|
fs::create_dir(&path).or_else(|e| {
|
||||||
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
@@ -2047,16 +2060,25 @@ fn do_copy_file(req: &CopyFileRequest) -> Result<()> {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle symlink creation
|
||||||
if sflag.contains(stat::SFlag::S_IFLNK) {
|
if sflag.contains(stat::SFlag::S_IFLNK) {
|
||||||
// After kubernetes secret's volume update, the '..data' symlink should point to
|
// Clean up existing path (whether symlink, dir, or file)
|
||||||
// the new timestamped directory.
|
if path.exists() || path.is_symlink() {
|
||||||
// TODO:The old and deleted timestamped dir still exists due to missing DELETE api in agent.
|
// Use appropriate removal method based on path type
|
||||||
// Hence, Unlink the existing symlink.
|
if path.is_symlink() {
|
||||||
if path.is_symlink() && path.exists() {
|
unistd::unlink(&path)?;
|
||||||
unistd::unlink(&path)?;
|
} else if path.is_dir() {
|
||||||
|
fs::remove_dir_all(&path)?;
|
||||||
|
} else {
|
||||||
|
fs::remove_file(&path)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create new symbolic link
|
||||||
let src = PathBuf::from(OsStr::from_bytes(&req.data));
|
let src = PathBuf::from(OsStr::from_bytes(&req.data));
|
||||||
unistd::symlinkat(&src, None, &path)?;
|
unistd::symlinkat(&src, None, &path)?;
|
||||||
|
|
||||||
|
// Set symlink ownership (permissions not supported for symlinks)
|
||||||
let path_str = CString::new(path.as_os_str().as_bytes())?;
|
let path_str = CString::new(path.as_os_str().as_bytes())?;
|
||||||
|
|
||||||
let ret = unsafe { libc::lchown(path_str.as_ptr(), req.uid as u32, req.gid as u32) };
|
let ret = unsafe { libc::lchown(path_str.as_ptr(), req.uid as u32, req.gid as u32) };
|
||||||
@@ -2071,7 +2093,7 @@ fn do_copy_file(req: &CopyFileRequest) -> Result<()> {
|
|||||||
let file = OpenOptions::new()
|
let file = OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.create(true)
|
.create(true)
|
||||||
.truncate(false)
|
.truncate(req.offset == 0) // Only truncate when offset is 0
|
||||||
.open(&tmpfile)?;
|
.open(&tmpfile)?;
|
||||||
|
|
||||||
file.write_all_at(req.data.as_slice(), req.offset as u64)?;
|
file.write_all_at(req.data.as_slice(), req.offset as u64)?;
|
||||||
@@ -2089,6 +2111,15 @@ fn do_copy_file(req: &CopyFileRequest) -> Result<()> {
|
|||||||
Some(Gid::from_raw(req.gid as u32)),
|
Some(Gid::from_raw(req.gid as u32)),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Remove existing target path before rename
|
||||||
|
if path.exists() || path.is_symlink() {
|
||||||
|
if path.is_dir() {
|
||||||
|
fs::remove_dir_all(&path)?;
|
||||||
|
} else {
|
||||||
|
fs::remove_file(&path)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fs::rename(tmpfile, path)?;
|
fs::rename(tmpfile, path)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@@ -16,6 +16,10 @@ const K8S_EMPTY_DIR: &str = "kubernetes.io~empty-dir";
|
|||||||
const K8S_CONFIGMAP: &str = "kubernetes.io~configmap";
|
const K8S_CONFIGMAP: &str = "kubernetes.io~configmap";
|
||||||
// K8S_SECRET is the K8s specific path for `secret` volumes
|
// K8S_SECRET is the K8s specific path for `secret` volumes
|
||||||
const K8S_SECRET: &str = "kubernetes.io~secret";
|
const K8S_SECRET: &str = "kubernetes.io~secret";
|
||||||
|
// K8S_PROJECTED is the K8s specific path for `projected` volumes
|
||||||
|
const K8S_PROJECTED: &str = "kubernetes.io~projected";
|
||||||
|
// K8S_DOWNWARD_API is the K8s specific path for `downward-api` volumes
|
||||||
|
const K8S_DOWNWARD_API: &str = "kubernetes.io~downward-api";
|
||||||
|
|
||||||
/// Check whether the path is a K8s empty directory.
|
/// Check whether the path is a K8s empty directory.
|
||||||
pub fn is_empty_dir<P: AsRef<Path>>(path: P) -> bool {
|
pub fn is_empty_dir<P: AsRef<Path>>(path: P) -> bool {
|
||||||
@@ -32,6 +36,16 @@ pub fn is_secret<P: AsRef<Path>>(path: P) -> bool {
|
|||||||
is_special_dir(path, K8S_SECRET)
|
is_special_dir(path, K8S_SECRET)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check whether the path is a K8s projected volume.
|
||||||
|
pub fn is_projected<P: AsRef<Path>>(path: P) -> bool {
|
||||||
|
is_special_dir(path, K8S_PROJECTED)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether the path is a K8s downward-api volume.
|
||||||
|
pub fn is_downward_api<P: AsRef<Path>>(path: P) -> bool {
|
||||||
|
is_special_dir(path, K8S_DOWNWARD_API)
|
||||||
|
}
|
||||||
|
|
||||||
/// Check whether the path is a K8s empty directory, configmap, or secret.
|
/// Check whether the path is a K8s empty directory, configmap, or secret.
|
||||||
///
|
///
|
||||||
/// For example, given a K8s EmptyDir, Kubernetes mounts
|
/// For example, given a K8s EmptyDir, Kubernetes mounts
|
||||||
@@ -321,6 +335,36 @@ mod tests {
|
|||||||
assert!(!is_secret(path));
|
assert!(!is_secret(path));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_is_projected() {
|
||||||
|
let path = "/volumes/kubernetes.io~projected/foo";
|
||||||
|
assert!(is_projected(path));
|
||||||
|
|
||||||
|
let path = "/volumes/kubernetes.io~projected//foo";
|
||||||
|
assert!(is_projected(path));
|
||||||
|
|
||||||
|
let path = "/volumes/kubernetes.io~projected-test/foo";
|
||||||
|
assert!(!is_projected(path));
|
||||||
|
|
||||||
|
let path = "/volumes/kubernetes.io~projected";
|
||||||
|
assert!(!is_projected(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_is_downward_api() {
|
||||||
|
let path = "/volumes/kubernetes.io~downward-api/foo";
|
||||||
|
assert!(is_downward_api(path));
|
||||||
|
|
||||||
|
let path = "/volumes/kubernetes.io~downward-api//foo";
|
||||||
|
assert!(is_downward_api(path));
|
||||||
|
|
||||||
|
let path = "/volumes/kubernetes.io~downward-api-test/foo";
|
||||||
|
assert!(!is_downward_api(path));
|
||||||
|
|
||||||
|
let path = "/volumes/kubernetes.io~downward-api";
|
||||||
|
assert!(!is_downward_api(path));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_container_type() {
|
fn test_container_type() {
|
||||||
let sid = "sid".to_string();
|
let sid = "sid".to_string();
|
||||||
|
52
src/runtime-rs/Cargo.lock
generated
52
src/runtime-rs/Cargo.lock
generated
@@ -1860,6 +1860,28 @@ dependencies = [
|
|||||||
"hashbrown 0.15.2",
|
"hashbrown 0.15.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "inotify"
|
||||||
|
version = "0.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.9.0",
|
||||||
|
"futures-core",
|
||||||
|
"inotify-sys",
|
||||||
|
"libc",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "inotify-sys"
|
||||||
|
version = "0.1.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "instant"
|
name = "instant"
|
||||||
version = "0.1.12"
|
version = "0.1.12"
|
||||||
@@ -3575,6 +3597,7 @@ dependencies = [
|
|||||||
"cgroups-rs",
|
"cgroups-rs",
|
||||||
"futures 0.3.28",
|
"futures 0.3.28",
|
||||||
"hypervisor",
|
"hypervisor",
|
||||||
|
"inotify",
|
||||||
"kata-sys-util",
|
"kata-sys-util",
|
||||||
"kata-types",
|
"kata-types",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
@@ -3599,6 +3622,7 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid 0.4.0",
|
"uuid 0.4.0",
|
||||||
|
"walkdir",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -3806,6 +3830,15 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "same-file"
|
||||||
|
version = "1.0.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
|
||||||
|
dependencies = [
|
||||||
|
"winapi-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "schannel"
|
name = "schannel"
|
||||||
version = "0.1.22"
|
version = "0.1.22"
|
||||||
@@ -5075,6 +5108,16 @@ version = "1.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
|
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "walkdir"
|
||||||
|
version = "2.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
|
||||||
|
dependencies = [
|
||||||
|
"same-file",
|
||||||
|
"winapi-util",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "want"
|
name = "want"
|
||||||
version = "0.3.1"
|
version = "0.3.1"
|
||||||
@@ -5220,6 +5263,15 @@ version = "0.4.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi-util"
|
||||||
|
version = "0.1.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||||
|
dependencies = [
|
||||||
|
"windows-sys 0.52.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winapi-x86_64-pc-windows-gnu"
|
name = "winapi-x86_64-pc-windows-gnu"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@@ -33,6 +33,8 @@ tokio = { workspace = true, features = ["process"] }
|
|||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
uuid = { version = "0.4", features = ["v4"] }
|
uuid = { version = "0.4", features = ["v4"] }
|
||||||
oci-spec = { workspace = true }
|
oci-spec = { workspace = true }
|
||||||
|
inotify = "0.11.0"
|
||||||
|
walkdir = "2.5.0"
|
||||||
|
|
||||||
## Dependencies from `rust-netlink`
|
## Dependencies from `rust-netlink`
|
||||||
netlink-packet-route = "0.22"
|
netlink-packet-route = "0.22"
|
||||||
|
@@ -5,29 +5,44 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::{HashSet, VecDeque},
|
||||||
fs::File,
|
fs::File,
|
||||||
io::Read,
|
io::Read,
|
||||||
os::unix::fs::MetadataExt,
|
os::unix::fs::MetadataExt,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use agent::Agent;
|
use agent::Agent;
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use hypervisor::device::device_manager::DeviceManager;
|
use hypervisor::device::device_manager::DeviceManager;
|
||||||
|
use inotify::{EventMask, Inotify, WatchMask};
|
||||||
use kata_sys_util::mount::{get_mount_options, get_mount_path, get_mount_type};
|
use kata_sys_util::mount::{get_mount_options, get_mount_path, get_mount_type};
|
||||||
use tokio::sync::RwLock;
|
use nix::sys::stat::SFlag;
|
||||||
|
use tokio::{
|
||||||
|
io::AsyncReadExt,
|
||||||
|
sync::{Mutex, RwLock},
|
||||||
|
task::JoinHandle,
|
||||||
|
time::Instant,
|
||||||
|
};
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use super::Volume;
|
use super::Volume;
|
||||||
use crate::share_fs::DEFAULT_KATA_GUEST_SANDBOX_DIR;
|
use crate::share_fs::DEFAULT_KATA_GUEST_SANDBOX_DIR;
|
||||||
use crate::share_fs::PASSTHROUGH_FS_DIR;
|
use crate::share_fs::PASSTHROUGH_FS_DIR;
|
||||||
use crate::share_fs::{MountedInfo, ShareFs, ShareFsVolumeConfig};
|
use crate::share_fs::{MountedInfo, ShareFs, ShareFsVolumeConfig};
|
||||||
use kata_types::mount;
|
use kata_types::{
|
||||||
|
k8s::{is_configmap, is_downward_api, is_projected, is_secret},
|
||||||
|
mount,
|
||||||
|
};
|
||||||
use oci_spec::runtime as oci;
|
use oci_spec::runtime as oci;
|
||||||
|
|
||||||
const SYS_MOUNT_PREFIX: [&str; 2] = ["/proc", "/sys"];
|
const SYS_MOUNT_PREFIX: [&str; 2] = ["/proc", "/sys"];
|
||||||
|
const MONITOR_INTERVAL: Duration = Duration::from_millis(100);
|
||||||
|
const DEBOUNCE_TIME: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
// copy file to container's rootfs if filesystem sharing is not supported, otherwise
|
// copy file to container's rootfs if filesystem sharing is not supported, otherwise
|
||||||
// bind mount it in the shared directory.
|
// bind mount it in the shared directory.
|
||||||
@@ -39,6 +54,198 @@ pub(crate) struct ShareFsVolume {
|
|||||||
share_fs: Option<Arc<dyn ShareFs>>,
|
share_fs: Option<Arc<dyn ShareFs>>,
|
||||||
mounts: Vec<oci::Mount>,
|
mounts: Vec<oci::Mount>,
|
||||||
storages: Vec<agent::Storage>,
|
storages: Vec<agent::Storage>,
|
||||||
|
monitor_task: Option<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Directory Monitor Config
|
||||||
|
/// path: the to be watched target directory
|
||||||
|
/// recursive: recursively monitor sub-dirs or not,
|
||||||
|
/// follow_symlinks: track symlinks or not,
|
||||||
|
/// exclude_hidden: exclude hidden files or not,
|
||||||
|
/// watch_events: Watcher Event types with CREATE/DELETE/MODIFY/MOVED_FROM/MOVED_TO
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct MonitorConfig {
|
||||||
|
path: PathBuf,
|
||||||
|
recursive: bool,
|
||||||
|
follow_symlinks: bool,
|
||||||
|
exclude_hidden: bool,
|
||||||
|
watch_events: WatchMask,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MonitorConfig {
|
||||||
|
fn new(path: &Path) -> Self {
|
||||||
|
Self {
|
||||||
|
path: path.to_path_buf(),
|
||||||
|
recursive: true,
|
||||||
|
follow_symlinks: false,
|
||||||
|
exclude_hidden: true,
|
||||||
|
watch_events: WatchMask::CREATE
|
||||||
|
| WatchMask::DELETE
|
||||||
|
| WatchMask::MODIFY
|
||||||
|
| WatchMask::MOVED_FROM
|
||||||
|
| WatchMask::MOVED_TO
|
||||||
|
| WatchMask::CLOSE_WRITE,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct FsWatcher {
|
||||||
|
config: MonitorConfig,
|
||||||
|
inotify: Arc<Mutex<Inotify>>,
|
||||||
|
watch_dirs: Arc<Mutex<HashSet<PathBuf>>>,
|
||||||
|
pending_events: Arc<Mutex<HashSet<PathBuf>>>,
|
||||||
|
need_sync: Arc<Mutex<bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FsWatcher {
|
||||||
|
async fn new(source_path: &Path) -> Result<Self> {
|
||||||
|
let inotify = Inotify::init()?;
|
||||||
|
let mon_cfg = MonitorConfig::new(source_path);
|
||||||
|
let mut watcher = Self {
|
||||||
|
config: mon_cfg,
|
||||||
|
inotify: Arc::new(Mutex::new(inotify)),
|
||||||
|
pending_events: Arc::new(Mutex::new(HashSet::new())),
|
||||||
|
watch_dirs: Arc::new(Mutex::new(HashSet::new())),
|
||||||
|
need_sync: Arc::new(Mutex::new(false)),
|
||||||
|
};
|
||||||
|
|
||||||
|
watcher.add_watchers().await?;
|
||||||
|
|
||||||
|
Ok(watcher)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// add watched directory recursively
|
||||||
|
async fn add_watchers(&mut self) -> Result<()> {
|
||||||
|
let mut watched_dirs = self.watch_dirs.lock().await;
|
||||||
|
let config: &MonitorConfig = &self.config;
|
||||||
|
let walker = WalkDir::new(&config.path)
|
||||||
|
.follow_links(config.follow_symlinks)
|
||||||
|
.min_depth(0)
|
||||||
|
.max_depth(if config.recursive { usize::MAX } else { 1 })
|
||||||
|
.into_iter()
|
||||||
|
.filter_entry(|e| {
|
||||||
|
!(config.exclude_hidden
|
||||||
|
&& e.file_name()
|
||||||
|
.to_str()
|
||||||
|
.map(|s| s.starts_with('.'))
|
||||||
|
.unwrap_or(false))
|
||||||
|
});
|
||||||
|
|
||||||
|
for entry in walker.filter_map(|e| e.ok()) {
|
||||||
|
if entry.file_type().is_dir() {
|
||||||
|
let path = entry.path();
|
||||||
|
if watched_dirs.insert(path.to_path_buf()) {
|
||||||
|
self.inotify
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.watches()
|
||||||
|
.add(path, config.watch_events)?; // we don't use WatchMask::ALL_EVENTS
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// start monitor
|
||||||
|
pub async fn start_monitor(
|
||||||
|
&self,
|
||||||
|
agent: Arc<dyn Agent>,
|
||||||
|
src: PathBuf,
|
||||||
|
dst: PathBuf,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
let need_sync = self.need_sync.clone();
|
||||||
|
let pending_events = self.pending_events.clone();
|
||||||
|
let inotify = self.inotify.clone();
|
||||||
|
let monitor_config = self.config.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut buffer = [0u8; 4096];
|
||||||
|
let mut last_event_time = None;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// use cloned inotify instance
|
||||||
|
match inotify.lock().await.read_events(&mut buffer) {
|
||||||
|
Ok(events) => {
|
||||||
|
for event in events {
|
||||||
|
if !event.mask.intersects(
|
||||||
|
EventMask::CREATE
|
||||||
|
| EventMask::MODIFY
|
||||||
|
| EventMask::DELETE
|
||||||
|
| EventMask::MOVED_FROM
|
||||||
|
| EventMask::MOVED_TO,
|
||||||
|
) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(file_name) = event.name {
|
||||||
|
let full_path = &monitor_config.path.join(file_name);
|
||||||
|
let event_types: Vec<&str> = event
|
||||||
|
.mask
|
||||||
|
.iter()
|
||||||
|
.map(|m| match m {
|
||||||
|
EventMask::CREATE => "CREATE",
|
||||||
|
EventMask::DELETE => "DELETE",
|
||||||
|
EventMask::MODIFY => "MODIFY",
|
||||||
|
EventMask::MOVED_FROM => "MOVED_FROM",
|
||||||
|
EventMask::MOVED_TO => "MOVED_TO",
|
||||||
|
EventMask::CLOSE_WRITE => "CLOSE_WRITE",
|
||||||
|
_ => "OTHER",
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
sl!(),
|
||||||
|
"handle events [{}] {:?} -> {:?}",
|
||||||
|
event_types.join("|"),
|
||||||
|
event.mask,
|
||||||
|
full_path
|
||||||
|
);
|
||||||
|
pending_events.lock().await.insert(full_path.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => eprintln!("inotify error: {}", e),
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle events to be synchronized
|
||||||
|
let events_paths = {
|
||||||
|
let mut pending = pending_events.lock().await;
|
||||||
|
pending.drain().collect::<Vec<_>>()
|
||||||
|
};
|
||||||
|
if !events_paths.is_empty() {
|
||||||
|
*need_sync.lock().await = true;
|
||||||
|
last_event_time = Some(Instant::now());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debounce handling
|
||||||
|
// It is used to prevent unnecessary repeated copies when file changes are triggered
|
||||||
|
// multiple times in a short period; we only execute the last one.
|
||||||
|
if let Some(t) = last_event_time {
|
||||||
|
if Instant::now().duration_since(t) > DEBOUNCE_TIME && *need_sync.lock().await {
|
||||||
|
info!(sl!(), "debounce handle copyfile {:?} -> {:?}", &src, &dst);
|
||||||
|
if let Err(e) =
|
||||||
|
copy_dir_recursively(&src, &dst.display().to_string(), &agent).await
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
sl!(),
|
||||||
|
"debounce handle copyfile {:?} -> {:?} failed with error: {:?}",
|
||||||
|
&src,
|
||||||
|
&dst,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
eprintln!("sync host/guest files failed: {}", e);
|
||||||
|
}
|
||||||
|
*need_sync.lock().await = false;
|
||||||
|
last_event_time = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(MONITOR_INTERVAL).await;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShareFsVolume {
|
impl ShareFsVolume {
|
||||||
@@ -62,6 +269,7 @@ impl ShareFsVolume {
|
|||||||
share_fs: share_fs.as_ref().map(Arc::clone),
|
share_fs: share_fs.as_ref().map(Arc::clone),
|
||||||
mounts: vec![],
|
mounts: vec![],
|
||||||
storages: vec![],
|
storages: vec![],
|
||||||
|
monitor_task: None,
|
||||||
};
|
};
|
||||||
match share_fs {
|
match share_fs {
|
||||||
None => {
|
None => {
|
||||||
@@ -133,6 +341,81 @@ impl ShareFsVolume {
|
|||||||
oci_mount.set_source(Some(PathBuf::from(&dest)));
|
oci_mount.set_source(Some(PathBuf::from(&dest)));
|
||||||
oci_mount.set_options(m.options().clone());
|
oci_mount.set_options(m.options().clone());
|
||||||
volume.mounts.push(oci_mount);
|
volume.mounts.push(oci_mount);
|
||||||
|
} else if is_allowlisted_copy_volume(&src) {
|
||||||
|
// For security reasons, we have restricted directory copying. Currently, only directories under
|
||||||
|
// the path `/var/lib/kubelet/pods/<uid>/volumes/{kubernetes.io~configmap, kubernetes.io~secret, kubernetes.io~downward-api, kubernetes.io~projected}`
|
||||||
|
// are allowed to be copied into the guest. Copying of other directories will be prohibited.
|
||||||
|
|
||||||
|
// source_path: "/var/lib/kubelet/pods/6dad7281-57ff-49e4-b844-c588ceabec16/volumes/kubernetes.io~projected/kube-api-access-8s2nl"
|
||||||
|
info!(sl!(), "copying directory {:?} to guest", &source_path);
|
||||||
|
|
||||||
|
// create target path in guest
|
||||||
|
let dest_dir = [
|
||||||
|
DEFAULT_KATA_GUEST_SANDBOX_DIR,
|
||||||
|
PASSTHROUGH_FS_DIR,
|
||||||
|
file_name.clone().as_str(),
|
||||||
|
]
|
||||||
|
.join("/");
|
||||||
|
|
||||||
|
// create directory
|
||||||
|
let dir_metadata = std::fs::metadata(src.clone())
|
||||||
|
.context(format!("read metadata from directory: {:?}", src))?;
|
||||||
|
|
||||||
|
// ttRPC request for creating directory
|
||||||
|
let dir_request = agent::CopyFileRequest {
|
||||||
|
path: dest_dir.clone(),
|
||||||
|
file_size: 0, // useless for dir
|
||||||
|
uid: dir_metadata.uid() as i32,
|
||||||
|
gid: dir_metadata.gid() as i32,
|
||||||
|
dir_mode: dir_metadata.mode(),
|
||||||
|
file_mode: SFlag::S_IFDIR.bits(),
|
||||||
|
data: vec![], // no files
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// dest_dir: "/run/kata-containers/sandbox/passthrough/sandbox-b2790ec0-kube-api-access-8s2nl"
|
||||||
|
info!(
|
||||||
|
sl!(),
|
||||||
|
"creating directory: {:?} in sandbox with file_mode: {:?}",
|
||||||
|
dest_dir,
|
||||||
|
dir_request.file_mode
|
||||||
|
);
|
||||||
|
|
||||||
|
// send request for creating directory
|
||||||
|
agent
|
||||||
|
.copy_file(dir_request)
|
||||||
|
.await
|
||||||
|
.context(format!("create directory in sandbox: {:?}", dest_dir))?;
|
||||||
|
|
||||||
|
// recursively copy files from this directory
|
||||||
|
// similar to `scp -r $source_dir $target_dir`
|
||||||
|
copy_dir_recursively(src.clone(), &dest_dir, &agent)
|
||||||
|
.await
|
||||||
|
.context(format!("failed to copy directory contents: {:?}", src))?;
|
||||||
|
|
||||||
|
// handle special mount options
|
||||||
|
let mut options = m.options().clone().unwrap_or_default();
|
||||||
|
if !options.iter().any(|x| x == "rbind") {
|
||||||
|
options.push("rbind".into());
|
||||||
|
}
|
||||||
|
if !options.iter().any(|x| x == "rprivate") {
|
||||||
|
options.push("rprivate".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// add OCI Mount
|
||||||
|
let mut oci_mount = oci::Mount::default();
|
||||||
|
oci_mount.set_destination(m.destination().clone());
|
||||||
|
oci_mount.set_typ(Some("bind".to_string()));
|
||||||
|
oci_mount.set_source(Some(PathBuf::from(&dest_dir)));
|
||||||
|
oci_mount.set_options(Some(options));
|
||||||
|
volume.mounts.push(oci_mount);
|
||||||
|
|
||||||
|
// start monitoring
|
||||||
|
let watcher = FsWatcher::new(Path::new(&source_path)).await?;
|
||||||
|
let monitor_task = watcher
|
||||||
|
.start_monitor(agent.clone(), src.clone(), dest_dir.into())
|
||||||
|
.await;
|
||||||
|
volume.monitor_task = Some(monitor_task);
|
||||||
} else {
|
} else {
|
||||||
// If not, we can ignore it. Let's issue a warning so that the user knows.
|
// If not, we can ignore it. Let's issue a warning so that the user knows.
|
||||||
warn!(
|
warn!(
|
||||||
@@ -307,6 +590,130 @@ impl Volume for ShareFsVolume {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
async fn copy_dir_recursively<P: AsRef<Path>>(
|
||||||
|
src_dir: P,
|
||||||
|
dest_dir: &str,
|
||||||
|
agent: &Arc<dyn Agent>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut queue = VecDeque::new();
|
||||||
|
queue.push_back((src_dir.as_ref().to_path_buf(), dest_dir.to_string()));
|
||||||
|
|
||||||
|
while let Some((current_src, current_dest)) = queue.pop_front() {
|
||||||
|
let mut entries = tokio::fs::read_dir(¤t_src)
|
||||||
|
.await
|
||||||
|
.context(format!("read directory: {:?}", current_src))?;
|
||||||
|
|
||||||
|
while let Some(entry) = entries
|
||||||
|
.next_entry()
|
||||||
|
.await
|
||||||
|
.context(format!("read directory entry in {:?}", current_src))?
|
||||||
|
{
|
||||||
|
let entry_path = entry.path();
|
||||||
|
let file_name = entry_path
|
||||||
|
.file_name()
|
||||||
|
.ok_or_else(|| anyhow!("get file name for {:?}", entry_path))?
|
||||||
|
.to_string_lossy()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let dest_path = format!("{}/{}", current_dest, file_name);
|
||||||
|
|
||||||
|
let metadata = entry
|
||||||
|
.metadata()
|
||||||
|
.await
|
||||||
|
.context(format!("read metadata for {:?}", entry_path))?;
|
||||||
|
|
||||||
|
if metadata.is_symlink() {
|
||||||
|
// handle symlinks
|
||||||
|
let entry_path_err = entry_path.clone();
|
||||||
|
let entry_path_clone = entry_path.clone();
|
||||||
|
let link_target =
|
||||||
|
tokio::task::spawn_blocking(move || std::fs::read_link(&entry_path_clone))
|
||||||
|
.await
|
||||||
|
.context(format!(
|
||||||
|
"failed to spawn blocking task for symlink: {:?}",
|
||||||
|
entry_path_err
|
||||||
|
))??;
|
||||||
|
|
||||||
|
let link_target_str = link_target.to_string_lossy().into_owned();
|
||||||
|
let symlink_request = agent::CopyFileRequest {
|
||||||
|
path: dest_path.clone(),
|
||||||
|
file_size: link_target_str.len() as i64,
|
||||||
|
uid: metadata.uid() as i32,
|
||||||
|
gid: metadata.gid() as i32,
|
||||||
|
file_mode: SFlag::S_IFLNK.bits(),
|
||||||
|
data: link_target_str.clone().into_bytes(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
info!(
|
||||||
|
sl!(),
|
||||||
|
"copying symlink_request {:?} in sandbox with file_mode: {:?}",
|
||||||
|
dest_path.clone(),
|
||||||
|
symlink_request.file_mode
|
||||||
|
);
|
||||||
|
|
||||||
|
agent.copy_file(symlink_request).await.context(format!(
|
||||||
|
"failed to create symlink: {:?} -> {:?}",
|
||||||
|
dest_path, link_target_str
|
||||||
|
))?;
|
||||||
|
} else if metadata.is_dir() {
|
||||||
|
// handle directory
|
||||||
|
let dir_request = agent::CopyFileRequest {
|
||||||
|
path: dest_path.clone(),
|
||||||
|
file_size: 0,
|
||||||
|
uid: metadata.uid() as i32,
|
||||||
|
gid: metadata.gid() as i32,
|
||||||
|
dir_mode: metadata.mode(),
|
||||||
|
file_mode: SFlag::S_IFDIR.bits(),
|
||||||
|
data: vec![],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
info!(
|
||||||
|
sl!(),
|
||||||
|
"copying subdirectory {:?} in sandbox with file_mode: {:?}",
|
||||||
|
dir_request.path,
|
||||||
|
dir_request.file_mode
|
||||||
|
);
|
||||||
|
agent
|
||||||
|
.copy_file(dir_request)
|
||||||
|
.await
|
||||||
|
.context(format!("Failed to create subdirectory: {:?}", dest_path))?;
|
||||||
|
|
||||||
|
// push back the sub-dir into queue to handle it in time
|
||||||
|
queue.push_back((entry_path, dest_path));
|
||||||
|
} else if metadata.is_file() {
|
||||||
|
// async read file
|
||||||
|
let mut file = tokio::fs::File::open(&entry_path)
|
||||||
|
.await
|
||||||
|
.context(format!("open file: {:?}", entry_path))?;
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
file.read_to_end(&mut buffer)
|
||||||
|
.await
|
||||||
|
.context(format!("read file: {:?}", entry_path))?;
|
||||||
|
|
||||||
|
let file_request = agent::CopyFileRequest {
|
||||||
|
path: dest_path.clone(),
|
||||||
|
file_size: metadata.len() as i64,
|
||||||
|
uid: metadata.uid() as i32,
|
||||||
|
gid: metadata.gid() as i32,
|
||||||
|
file_mode: SFlag::S_IFREG.bits(),
|
||||||
|
data: buffer,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
info!(sl!(), "copy file {:?} to guest", dest_path.clone());
|
||||||
|
agent
|
||||||
|
.copy_file(file_request)
|
||||||
|
.await
|
||||||
|
.context(format!("copy file: {:?} -> {:?}", entry_path, dest_path))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn is_share_fs_volume(m: &oci::Mount) -> bool {
|
pub(crate) fn is_share_fs_volume(m: &oci::Mount) -> bool {
|
||||||
let mount_type = get_mount_type(m);
|
let mount_type = get_mount_type(m);
|
||||||
(mount_type == "bind" || mount_type == mount::KATA_EPHEMERAL_VOLUME_TYPE)
|
(mount_type == "bind" || mount_type == mount::KATA_EPHEMERAL_VOLUME_TYPE)
|
||||||
@@ -363,6 +770,20 @@ pub fn generate_mount_path(id: &str, file_name: &str) -> String {
|
|||||||
format!("{}-{}-{}", nid, uid, file_name)
|
format!("{}-{}-{}", nid, uid, file_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This function is used to check whether a given volume is in the allowed copy allowlist.
|
||||||
|
/// More specifically, it determines whether the volume's path is located under a predefined
|
||||||
|
/// list of allowed copy directories.
|
||||||
|
pub(crate) fn is_allowlisted_copy_volume(source_path: &PathBuf) -> bool {
|
||||||
|
if !source_path.is_dir() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// allowlist: { kubernetes.io~projected, kubernetes.io~configmap, kubernetes.io~secret, kubernetes.io~downward-api }
|
||||||
|
is_projected(source_path)
|
||||||
|
|| is_downward_api(source_path)
|
||||||
|
|| is_secret(source_path)
|
||||||
|
|| is_configmap(source_path)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -381,4 +802,34 @@ mod test {
|
|||||||
assert!(is_system_mount(proc_sub_dir));
|
assert!(is_system_mount(proc_sub_dir));
|
||||||
assert!(!is_system_mount(not_sys_dir));
|
assert!(!is_system_mount(not_sys_dir));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_is_allowlisted_copy_volume() {
|
||||||
|
// The configmap is /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~configmap/kube-configmap-0s2no/{..data, key1, key2,...}
|
||||||
|
// The secret is /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~secret/kube-secret-2s2np/{..data, key1, key2,...}
|
||||||
|
// The projected is /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~projected/kube-api-access-8s2nl/{..data, key1, key2,...}
|
||||||
|
// The downward-api is /var/lib/kubelet/pods/<uid>/volumes/kubernetes.io~downward-api/downward-api-xxxx/{..data, key1, key2,...}
|
||||||
|
let configmap =
|
||||||
|
"var/lib/kubelet/pods/1000/volumes/kubernetes.io~configmap/kube-configmap-0s2no";
|
||||||
|
let secret = "var/lib/kubelet/pods/1000/volumes/kubernetes.io~secret/kube-secret-2s2np";
|
||||||
|
let projected =
|
||||||
|
"var/lib/kubelet/1000/<uid>/volumes/kubernetes.io~projected/kube-api-access-8s2nl";
|
||||||
|
let downward_api =
|
||||||
|
"var/lib/kubelet/1000/<uid>/volumes/kubernetes.io~downward-api/downward-api-xxxx";
|
||||||
|
|
||||||
|
let temp_dir = tempfile::tempdir().unwrap();
|
||||||
|
let cm_path = temp_dir.path().join(configmap);
|
||||||
|
std::fs::create_dir_all(&cm_path).unwrap();
|
||||||
|
let secret_path = temp_dir.path().join(secret);
|
||||||
|
std::fs::create_dir_all(&secret_path).unwrap();
|
||||||
|
let projected_path = temp_dir.path().join(projected);
|
||||||
|
std::fs::create_dir_all(&projected_path).unwrap();
|
||||||
|
let downward_api_path = temp_dir.path().join(downward_api);
|
||||||
|
std::fs::create_dir_all(&downward_api_path).unwrap();
|
||||||
|
|
||||||
|
assert!(is_allowlisted_copy_volume(&cm_path));
|
||||||
|
assert!(is_allowlisted_copy_volume(&secret_path));
|
||||||
|
assert!(is_allowlisted_copy_volume(&projected_path));
|
||||||
|
assert!(is_allowlisted_copy_volume(&downward_api_path));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user