runtime-rs: support watchable mount

Use watchable mount to support inotify for virtio-fs.

Fixes: #5184

Signed-off-by: Bin Liu <bin@hyper.sh>
This commit is contained in:
Bin Liu 2022-09-23 11:50:12 +08:00
parent e05e42fd3c
commit 4a763925e5
9 changed files with 352 additions and 30 deletions

View File

@ -11,7 +11,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;
use anyhow::{ensure, Context, Result};
use anyhow::{anyhow, ensure, Context, Result};
use async_recursion::async_recursion;
use nix::mount::{umount, MsFlags};
use nix::unistd::{Gid, Uid};
@ -34,9 +34,13 @@ const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024;
/// How often to check for modified files.
const WATCH_INTERVAL_SECS: u64 = 2;
/// Destination path for tmpfs
/// Destination path for tmpfs, which used by the golang runtime
const WATCH_MOUNT_POINT_PATH: &str = "/run/kata-containers/shared/containers/watchable/";
/// Destination path for tmpfs for runtime-rs passthrough file sharing
const WATCH_MOUNT_POINT_PATH_PASSTHROUGH: &str =
"/run/kata-containers/shared/containers/passthrough/watchable/";
/// Represents a single watched storage entry which may have multiple files to watch.
#[derive(Default, Debug, Clone)]
struct Storage {
@ -451,7 +455,7 @@ impl BindWatcher {
) -> Result<()> {
if self.watch_thread.is_none() {
// Virtio-fs shared path is RO by default, so we back the target-mounts by tmpfs.
self.mount(logger).await?;
self.mount(logger).await.context("mount watch directory")?;
// Spawn background thread to monitor changes
self.watch_thread = Some(Self::spawn_watcher(
@ -500,16 +504,28 @@ impl BindWatcher {
}
async fn mount(&self, logger: &Logger) -> Result<()> {
fs::create_dir_all(WATCH_MOUNT_POINT_PATH).await?;
// the watchable directory is created on the host side.
// here we can only check if it exist.
// first we will check the default WATCH_MOUNT_POINT_PATH,
// and then check WATCH_MOUNT_POINT_PATH_PASSTHROUGH
// in turn which are introduced by runtime-rs file sharing.
let watchable_dir = if Path::new(WATCH_MOUNT_POINT_PATH).is_dir() {
WATCH_MOUNT_POINT_PATH
} else if Path::new(WATCH_MOUNT_POINT_PATH_PASSTHROUGH).is_dir() {
WATCH_MOUNT_POINT_PATH_PASSTHROUGH
} else {
return Err(anyhow!("watchable mount source not found"));
};
baremount(
Path::new("tmpfs"),
Path::new(WATCH_MOUNT_POINT_PATH),
Path::new(watchable_dir),
"tmpfs",
MsFlags::empty(),
"",
logger,
)?;
)
.context("baremount watchable mount path")?;
Ok(())
}
@ -520,7 +536,12 @@ impl BindWatcher {
handle.abort();
}
let _ = umount(WATCH_MOUNT_POINT_PATH);
// try umount watchable mount path in turn
if Path::new(WATCH_MOUNT_POINT_PATH).is_dir() {
let _ = umount(WATCH_MOUNT_POINT_PATH);
} else if Path::new(WATCH_MOUNT_POINT_PATH_PASSTHROUGH).is_dir() {
let _ = umount(WATCH_MOUNT_POINT_PATH_PASSTHROUGH);
}
}
}
@ -529,6 +550,7 @@ mod tests {
use super::*;
use crate::mount::is_mounted;
use nix::unistd::{Gid, Uid};
use scopeguard::defer;
use std::fs;
use std::thread;
use test_utils::skip_if_not_root;
@ -1275,13 +1297,19 @@ mod tests {
let logger = slog::Logger::root(slog::Discard, o!());
let mut watcher = BindWatcher::default();
watcher.mount(&logger).await.unwrap();
assert!(is_mounted(WATCH_MOUNT_POINT_PATH).unwrap());
for mount_point in [WATCH_MOUNT_POINT_PATH, WATCH_MOUNT_POINT_PATH_PASSTHROUGH] {
fs::create_dir_all(mount_point).unwrap();
// ensure the watchable directory is deleted.
defer!(fs::remove_dir_all(mount_point).unwrap());
thread::sleep(Duration::from_millis(20));
watcher.mount(&logger).await.unwrap();
assert!(is_mounted(mount_point).unwrap());
watcher.cleanup();
assert!(!is_mounted(WATCH_MOUNT_POINT_PATH).unwrap());
thread::sleep(Duration::from_millis(20));
watcher.cleanup();
assert!(!is_mounted(mount_point).unwrap());
}
}
#[tokio::test]
@ -1289,6 +1317,10 @@ mod tests {
async fn spawn_thread() {
skip_if_not_root!();
fs::create_dir_all(WATCH_MOUNT_POINT_PATH).unwrap();
// ensure the watchable directory is deleted.
defer!(fs::remove_dir_all(WATCH_MOUNT_POINT_PATH).unwrap());
let source_dir = tempfile::tempdir().unwrap();
fs::write(source_dir.path().join("1.txt"), "one").unwrap();
@ -1319,6 +1351,10 @@ mod tests {
async fn verify_container_cleanup_watching() {
skip_if_not_root!();
fs::create_dir_all(WATCH_MOUNT_POINT_PATH).unwrap();
// ensure the watchable directory is deleted.
defer!(fs::remove_dir_all(WATCH_MOUNT_POINT_PATH).unwrap());
let source_dir = tempfile::tempdir().unwrap();
fs::write(source_dir.path().join("1.txt"), "one").unwrap();

2
src/libs/Cargo.lock generated
View File

@ -413,6 +413,7 @@ dependencies = [
"byte-unit",
"glob",
"lazy_static",
"nix 0.24.2",
"num_cpus",
"oci",
"regex",
@ -421,6 +422,7 @@ dependencies = [
"slog",
"slog-scope",
"tempfile",
"test-utils",
"thiserror",
"toml",
]

View File

@ -27,6 +27,8 @@ oci = { path = "../oci" }
[dev-dependencies]
tempfile = "3"
test-utils = { path = "../test-utils" }
nix = "0.24.2"
[features]
default = []

View File

@ -10,20 +10,39 @@ use crate::annotations;
use crate::container::ContainerType;
use std::str::FromStr;
// K8S_EMPTY_DIR is the k8s specific path for `empty-dir` volumes
// K8S_EMPTY_DIR is the K8s specific path for `empty-dir` volumes
const K8S_EMPTY_DIR: &str = "kubernetes.io~empty-dir";
// K8S_CONFIGMAP is the K8s specific path for `configmap` volumes
const K8S_CONFIGMAP: &str = "kubernetes.io~configmap";
// K8S_SECRET is the K8s specific path for `secret` volumes
const K8S_SECRET: &str = "kubernetes.io~secret";
/// Check whether the path is a K8S empty directory.
/// Check whether the path is a K8s empty directory.
pub fn is_empty_dir<P: AsRef<Path>>(path: P) -> bool {
is_special_dir(path, K8S_EMPTY_DIR)
}
/// Check whether the path is a K8s configmap.
pub fn is_configmap<P: AsRef<Path>>(path: P) -> bool {
is_special_dir(path, K8S_CONFIGMAP)
}
/// Check whether the path is a K8s secret.
pub fn is_secret<P: AsRef<Path>>(path: P) -> bool {
is_special_dir(path, K8S_SECRET)
}
/// Check whether the path is a K8s empty directory, configmap, or secret.
///
/// For a K8S EmptyDir, Kubernetes mounts
/// For example, given a K8s EmptyDir, Kubernetes mounts
/// "/var/lib/kubelet/pods/<id>/volumes/kubernetes.io~empty-dir/<volumeMount name>"
/// to "/<mount-point>".
pub fn is_empty_dir<P: AsRef<Path>>(path: P) -> bool {
pub fn is_special_dir<P: AsRef<Path>>(path: P, dir_type: &str) -> bool {
let path = path.as_ref();
if let Some(parent) = path.parent() {
if let Some(pname) = parent.file_name() {
if pname == K8S_EMPTY_DIR && parent.parent().is_some() {
if pname == dir_type && parent.parent().is_some() {
return true;
}
}
@ -77,10 +96,119 @@ pub fn container_type_with_id(spec: &oci::Spec) -> (ContainerType, Option<String
(container_type, sid)
}
// count_files will return the number of files within a given path.
// If the total number of
// files observed is greater than limit, break and return -1
fn count_files<P: AsRef<Path>>(path: P, limit: i32) -> std::io::Result<i32> {
// First, Check to see if the path exists
let src = std::fs::canonicalize(path)?;
// Special case if this is just a file, not a directory:
if !src.is_dir() {
return Ok(1);
}
let mut num_files = 0;
for entry in std::fs::read_dir(src)? {
let file = entry?;
let p = file.path();
if p.is_dir() {
num_files += count_files(&p, limit)?;
} else {
num_files += 1;
}
if num_files > limit {
return Ok(-1);
}
}
Ok(num_files)
}
/// Check if a volume should be processed as a watchable volume,
/// which adds inotify-like function for virtio-fs.
pub fn is_watchable_mount<P: AsRef<Path>>(path: P) -> bool {
if !is_secret(&path) && !is_configmap(&path) {
return false;
}
// we have a cap on number of FDs which can be present in mount
// to determine if watchable. A similar Check exists within the agent,
// which may or may not help handle case where extra files are added to
// a mount after the fact
let count = count_files(&path, 8).unwrap_or(0);
count > 0
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{annotations, container};
use std::fs;
use test_utils::skip_if_not_root;
#[test]
fn test_is_watchable_mount() {
skip_if_not_root!();
let result = is_watchable_mount("");
assert!(!result);
// path does not exist, failure expected:
let result = is_watchable_mount("/var/lib/kubelet/pods/5f0861a0-a987-4a3a-bb0f-1058ddb9678f/volumes/kubernetes.io~empty-dir/foobar");
assert!(!result);
let test_tmp_dir = tempfile::tempdir().expect("failed to create tempdir");
// Verify secret is successful (single file mount):
// /tmppath/kubernetes.io~secret/super-secret-thing
let secret_path = test_tmp_dir.path().join(K8S_SECRET);
let result = fs::create_dir_all(&secret_path);
assert!(result.is_ok());
let secret_file = &secret_path.join("super-secret-thing");
let result = fs::File::create(secret_file);
assert!(result.is_ok());
let result = is_watchable_mount(secret_file);
assert!(result);
// Verify that if we have too many files, it will no longer be watchable:
// /tmp/kubernetes.io~configmap/amazing-dir-of-configs/
// | - c0
// | - c1
// ...
// | - c7
// should be okay.
//
// 9 files should cause the mount to be deemed "not watchable"
let configmap_path = test_tmp_dir
.path()
.join(K8S_CONFIGMAP)
.join("amazing-dir-of-configs");
let result = fs::create_dir_all(&configmap_path);
assert!(result.is_ok());
// not a watchable mount if no files available.
let result = is_watchable_mount(&configmap_path);
assert!(!result);
for i in 0..8 {
let configmap_file = &configmap_path.join(format!("c{}", i));
let result = fs::File::create(configmap_file);
assert!(result.is_ok());
let result = is_watchable_mount(&configmap_path);
assert!(result);
}
let configmap_file = &configmap_path.join("too_much_files");
let result = fs::File::create(configmap_file);
assert!(result.is_ok());
let result = is_watchable_mount(&configmap_path);
assert!(!result);
}
#[test]
fn test_is_empty_dir() {
@ -103,6 +231,36 @@ mod tests {
assert!(is_empty_dir(empty_dir));
}
#[test]
fn test_is_configmap() {
let path = "/volumes/kubernetes.io~configmap/cm";
assert!(is_configmap(path));
let path = "/volumes/kubernetes.io~configmap//cm";
assert!(is_configmap(path));
let path = "/volumes/kubernetes.io~configmap-test/cm";
assert!(!is_configmap(path));
let path = "/volumes/kubernetes.io~configmap";
assert!(!is_configmap(path));
}
#[test]
fn test_is_secret() {
let path = "/volumes/kubernetes.io~secret/test-serect";
assert!(is_secret(path));
let path = "/volumes/kubernetes.io~secret//test-serect";
assert!(is_secret(path));
let path = "/volumes/kubernetes.io~secret-test/test-serect";
assert!(!is_secret(path));
let path = "/volumes/kubernetes.io~secret";
assert!(!is_secret(path));
}
#[test]
fn test_container_type() {
let sid = "sid".to_string();

View File

@ -1169,6 +1169,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "http"
version = "0.2.8"
@ -1219,12 +1225,26 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyperlocal"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c"
dependencies = [
"futures-util",
"hex",
"hyper",
"pin-project",
"tokio",
]
[[package]]
name = "hypervisor"
version = "0.1.0"
@ -1858,6 +1878,26 @@ dependencies = [
"indexmap",
]
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@ -2268,6 +2308,7 @@ dependencies = [
"anyhow",
"common",
"hyper",
"hyperlocal",
"kata-types",
"lazy_static",
"linux_container",

View File

@ -25,7 +25,10 @@ const VIRTIO_FS: &str = "virtio-fs";
const INLINE_VIRTIO_FS: &str = "inline-virtio-fs";
const KATA_HOST_SHARED_DIR: &str = "/run/kata-containers/shared/sandboxes/";
/// share fs (for example virtio-fs) mount path in the guest
const KATA_GUEST_SHARE_DIR: &str = "/run/kata-containers/shared/containers/";
pub(crate) const DEFAULT_KATA_GUEST_SANDBOX_DIR: &str = "/run/kata-containers/sandbox/";
const PASSTHROUGH_FS_DIR: &str = "passthrough";
@ -51,10 +54,12 @@ pub struct ShareFsVolumeConfig {
pub source: String,
pub target: String,
pub readonly: bool,
pub mount_options: Vec<String>,
}
pub struct ShareFsMountResult {
pub guest_path: String,
pub storages: Vec<agent::Storage>,
}
#[async_trait]

View File

@ -42,12 +42,12 @@ pub(crate) fn share_to_guest(
Ok(do_get_guest_path(target, cid, is_volume))
}
pub(crate) fn get_host_ro_shared_path(id: &str) -> PathBuf {
Path::new(KATA_HOST_SHARED_DIR).join(id).join("ro")
pub(crate) fn get_host_ro_shared_path(sid: &str) -> PathBuf {
Path::new(KATA_HOST_SHARED_DIR).join(sid).join("ro")
}
pub(crate) fn get_host_rw_shared_path(id: &str) -> PathBuf {
Path::new(KATA_HOST_SHARED_DIR).join(id).join("rw")
pub(crate) fn get_host_rw_shared_path(sid: &str) -> PathBuf {
Path::new(KATA_HOST_SHARED_DIR).join(sid).join("rw")
}
fn do_get_guest_any_path(target: &str, cid: &str, is_volume: bool, is_virtiofs: bool) -> String {
@ -66,11 +66,11 @@ fn do_get_guest_any_path(target: &str, cid: &str, is_volume: bool, is_virtiofs:
path.to_str().unwrap().to_string()
}
fn do_get_guest_path(target: &str, cid: &str, is_volume: bool) -> String {
pub(crate) fn do_get_guest_path(target: &str, cid: &str, is_volume: bool) -> String {
do_get_guest_any_path(target, cid, is_volume, false)
}
fn do_get_host_path(
pub(crate) fn do_get_host_path(
target: &str,
sid: &str,
cid: &str,

View File

@ -4,10 +4,21 @@
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{Context, Result};
use agent::Storage;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use kata_types::k8s::is_watchable_mount;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use super::{utils, ShareFsMount, ShareFsMountResult, ShareFsRootfsConfig, ShareFsVolumeConfig};
const WATCHABLE_PATH_NAME: &str = "watchable";
const WATCHABLE_BIND_DEV_TYPE: &str = "watchable-bind";
use super::{
utils, ShareFsMount, ShareFsMountResult, ShareFsRootfsConfig, ShareFsVolumeConfig,
KATA_GUEST_SHARE_DIR, PASSTHROUGH_FS_DIR,
};
pub struct VirtiofsShareMount {
id: String,
@ -32,11 +43,14 @@ impl ShareFsMount for VirtiofsShareMount {
false,
)
.context("share to guest")?;
Ok(ShareFsMountResult { guest_path })
Ok(ShareFsMountResult {
guest_path,
storages: vec![],
})
}
async fn share_volume(&self, config: ShareFsVolumeConfig) -> Result<ShareFsMountResult> {
let guest_path = utils::share_to_guest(
let mut guest_path = utils::share_to_guest(
&config.source,
&config.target,
&self.id,
@ -45,6 +59,61 @@ impl ShareFsMount for VirtiofsShareMount {
true,
)
.context("share to guest")?;
Ok(ShareFsMountResult { guest_path })
// watchable mounts
if is_watchable_mount(&config.source) {
// Create path in shared directory for creating watchable mount:
let host_rw_path = utils::get_host_rw_shared_path(&self.id);
// "/run/kata-containers/shared/sandboxes/$sid/rw/passthrough/watchable"
let watchable_host_path = Path::new(&host_rw_path)
.join(PASSTHROUGH_FS_DIR)
.join(WATCHABLE_PATH_NAME);
fs::create_dir_all(&watchable_host_path).context(format!(
"unable to create watchable path: {:?}",
&watchable_host_path,
))?;
fs::set_permissions(watchable_host_path, fs::Permissions::from_mode(0o750))?;
// path: /run/kata-containers/shared/containers/passthrough/watchable/config-map-name
let file_name = Path::new(&guest_path)
.file_name()
.context("get file name from guest path")?;
let watchable_guest_mount = Path::new(KATA_GUEST_SHARE_DIR)
.join(PASSTHROUGH_FS_DIR)
.join(WATCHABLE_PATH_NAME)
.join(file_name)
.into_os_string()
.into_string()
.map_err(|e| anyhow!("failed to get watchable guest mount path {:?}", e))?;
let watchable_storage: Storage = Storage {
driver: String::from(WATCHABLE_BIND_DEV_TYPE),
driver_options: Vec::new(),
source: guest_path,
fs_type: String::from("bind"),
fs_group: None,
options: config.mount_options,
mount_point: watchable_guest_mount.clone(),
};
// Update the guest_path, in order to identify what will
// change in the OCI spec.
guest_path = watchable_guest_mount;
let storages = vec![watchable_storage];
return Ok(ShareFsMountResult {
guest_path,
storages,
});
}
Ok(ShareFsMountResult {
guest_path,
storages: vec![],
})
}
}

View File

@ -20,6 +20,7 @@ use crate::share_fs::{ShareFs, ShareFsVolumeConfig};
// skip the volumes whose source had already set to guest share dir.
pub(crate) struct ShareFsVolume {
mounts: Vec<oci::Mount>,
storages: Vec<agent::Storage>,
}
impl ShareFsVolume {
@ -31,7 +32,10 @@ impl ShareFsVolume {
let file_name = Path::new(&m.source).file_name().unwrap().to_str().unwrap();
let file_name = generate_mount_path(cid, file_name);
let mut volume = Self { mounts: vec![] };
let mut volume = Self {
mounts: vec![],
storages: vec![],
};
match share_fs {
None => {
let mut need_copy = false;
@ -82,10 +86,15 @@ impl ShareFsVolume {
source: m.source.clone(),
target: file_name,
readonly: false,
mount_options: m.options.clone(),
})
.await
.context("share fs volume")?;
// set storages for the volume
volume.storages = mount_result.storages;
// set mount for the volume
volume.mounts.push(oci::Mount {
destination: m.destination.clone(),
r#type: "bind".to_string(),
@ -104,7 +113,7 @@ impl Volume for ShareFsVolume {
}
fn get_storage(&self) -> Result<Vec<agent::Storage>> {
Ok(vec![])
Ok(self.storages.clone())
}
fn cleanup(&self) -> Result<()> {