agent: Initial watchable-bind implementation

Add support for watchable-bind storage driver. When watchable-bind storage
is present, the agent will create a watchable path in a tmpfs, and poll the
watchable-bind source to keep this new mount-point up to date.

This poll will allow the agent to present the mount-point to the
container, allowing for inotify usage by the container workload.

If a mount becomes too large, either in file count or in overall size,
we want to stop treating it as watchable, and instead just treat as a
bindmount. This'll help avoid DoS by growing tmpfs too large, as well
as limiting time spent scanning files. If a watchable-bind grows beyond
8 files (arbitrary sane number for certs/secrets) or 1MB (limit on ConfigMap size),
we treat it as a normal bind.

Fixes: #1879

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
Signed-off-by: Samuel Ortiz <samuel.e.ortiz@protonmail.com>

agent: watcher: SandboxStorages check loop cleanup
This commit is contained in:
Maksym Pavlenko 2021-05-21 10:47:01 -07:00 committed by Eric Ernst
parent 57c0cee0a5
commit 6a93e5d593
8 changed files with 855 additions and 14 deletions

13
src/agent/Cargo.lock generated
View File

@ -51,6 +51,17 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73"
[[package]]
name = "async-recursion"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2"
dependencies = [
"proc-macro2 1.0.26",
"quote 1.0.9",
"syn 1.0.72",
]
[[package]]
name = "async-trait"
version = "0.1.50"
@ -507,6 +518,7 @@ name = "kata-agent"
version = "0.1.0"
dependencies = [
"anyhow",
"async-recursion",
"async-trait",
"capctl",
"cgroups-rs",
@ -1563,6 +1575,7 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",

View File

@ -20,11 +20,16 @@ scan_fmt = "0.2.3"
scopeguard = "1.0.0"
regex = "1"
# Async helpers
async-trait = "0.1.42"
tokio = { version = "1.2.0", features = ["rt", "rt-multi-thread", "sync", "macros", "io-util", "time", "signal", "io-std", "process", "fs"] }
async-recursion = "0.3.2"
futures = "0.3.12"
netlink-sys = { version = "0.6.0", features = ["tokio_socket",]}
# Async runtime
tokio = { version = "1.2.0", features = ["full"] }
tokio-vsock = "0.3.1"
netlink-sys = { version = "0.6.0", features = ["tokio_socket",]}
# Because the author has no time to maintain the crate, we switch the dependency to github,
# Once the new version released on crates.io, we switch it back.
# https://github.com/little-dude/netlink/issues/161

View File

@ -52,6 +52,7 @@ mod test_utils;
mod uevent;
mod util;
mod version;
mod watcher;
use mount::{cgroups_mount, general_mount};
use sandbox::Sandbox;

View File

@ -6,13 +6,16 @@
use std::collections::HashMap;
use std::ffi::CString;
use std::fs;
use std::fs::File;
use std::io;
use std::io::{BufRead, BufReader};
use std::iter;
use std::os::unix::fs::{MetadataExt, PermissionsExt};
use std::path::Path;
use std::ptr::null;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use libc::{c_void, mount};
@ -20,8 +23,6 @@ use nix::mount::{self, MsFlags};
use nix::unistd::Gid;
use regex::Regex;
use std::fs::File;
use std::io::{BufRead, BufReader};
use crate::device::{
get_scsi_device_name, get_virtio_blk_pci_device_name, online_device, wait_for_pmem_device,
@ -42,6 +43,7 @@ pub const DRIVER_SCSI_TYPE: &str = "scsi";
pub const DRIVER_NVDIMM_TYPE: &str = "nvdimm";
pub const DRIVER_EPHEMERAL_TYPE: &str = "ephemeral";
pub const DRIVER_LOCAL_TYPE: &str = "local";
pub const DRIVER_WATCHABLE_BIND_TYPE: &str = "watchable-bind";
pub const TYPE_ROOTFS: &str = "rootfs";
@ -132,7 +134,7 @@ lazy_static! {
];
}
pub const STORAGE_HANDLER_LIST: [&str; 8] = [
pub const STORAGE_HANDLER_LIST: &[&str] = &[
DRIVER_BLK_TYPE,
DRIVER_9P_TYPE,
DRIVER_VIRTIOFS_TYPE,
@ -141,6 +143,7 @@ pub const STORAGE_HANDLER_LIST: [&str; 8] = [
DRIVER_LOCAL_TYPE,
DRIVER_SCSI_TYPE,
DRIVER_NVDIMM_TYPE,
DRIVER_WATCHABLE_BIND_TYPE,
];
#[derive(Debug, Clone)]
@ -425,6 +428,20 @@ async fn nvdimm_storage_handler(
common_storage_handler(logger, &storage)
}
async fn bind_watcher_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
) -> Result<()> {
let mut locked = sandbox.lock().await;
let container_id = locked.id.clone();
locked
.bind_watcher
.add_container(container_id, iter::once(storage.clone()), logger)
.await
}
// mount_storage performs the mount described by the storage structure.
#[instrument]
fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
@ -478,7 +495,7 @@ fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
/// Looks for `mount_point` entry in the /proc/mounts.
#[instrument]
fn is_mounted(mount_point: &str) -> Result<bool> {
pub fn is_mounted(mount_point: &str) -> Result<bool> {
let mount_point = mount_point.trim_end_matches('/');
let found = fs::metadata(mount_point).is_ok()
// Looks through /proc/mounts and check if the mount exists
@ -555,6 +572,11 @@ pub async fn add_storages(
virtio_scsi_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_WATCHABLE_BIND_TYPE => {
bind_watcher_storage_handler(&logger, &storage, sandbox.clone()).await?;
// Don't register watch mounts, they're hanlded separately by the watcher.
Ok(String::new())
}
_ => {
return Err(anyhow!(
"Failed to find the storage handler {}",

View File

@ -253,11 +253,14 @@ impl AgentService {
if req.timeout == 0 {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let ctr = sandbox
.get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?;
ctr.destroy().await?;
sandbox.bind_watcher.remove_container(&cid).await;
sandbox
.get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?
.destroy()
.await?;
remove_container_resources(&mut sandbox)?;
@ -273,6 +276,7 @@ impl AgentService {
let mut sandbox = s.lock().await;
if let Some(ctr) = sandbox.get_container(&cid2) {
ctr.destroy().await.unwrap();
sandbox.bind_watcher.remove_container(&cid2).await;
tx.send(1).unwrap();
};
});

View File

@ -9,6 +9,7 @@ use crate::namespace::Namespace;
use crate::netlink::Handle;
use crate::network::Network;
use crate::uevent::{Uevent, UeventMatcher};
use crate::watcher::BindWatcher;
use anyhow::{anyhow, Context, Result};
use libc::pid_t;
use oci::{Hook, Hooks};
@ -54,6 +55,7 @@ pub struct Sandbox {
pub hooks: Option<Hooks>,
pub event_rx: Arc<Mutex<Receiver<String>>>,
pub event_tx: Option<Sender<String>>,
pub bind_watcher: BindWatcher,
}
impl Sandbox {
@ -85,6 +87,7 @@ impl Sandbox {
hooks: None,
event_rx,
event_tx: Some(tx),
bind_watcher: BindWatcher::new(),
})
}

771
src/agent/src/watcher.rs Normal file
View File

@ -0,0 +1,771 @@
// Copyright (c) 2021 Apple Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::fs;
use tokio::sync::Mutex;
use tokio::task;
use tokio::time::{self, Duration};
use anyhow::{ensure, Context, Result};
use async_recursion::async_recursion;
use nix::mount::{umount, MsFlags};
use slog::{debug, error, Logger};
use crate::mount::BareMount;
use crate::protocols::agent as protos;
/// The maximum number of file system entries agent will watch for each mount.
const MAX_ENTRIES_PER_STORAGE: usize = 8;
/// The maximum size of a watchable mount in bytes.
const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024;
/// How often to check for modified files.
const WATCH_INTERVAL_SECS: u64 = 2;
/// Destination path for tmpfs
const WATCH_MOUNT_POINT_PATH: &str = "/run/kata-containers/shared/containers/watchable/";
/// Represents a single watched storage entry which may have multiple files to watch.
#[derive(Default, Debug, Clone)]
struct Storage {
/// A mount point without inotify capabilities.
source_mount_point: PathBuf,
/// The target mount point, where the watched files will be copied/mirrored
/// when being changed, added or removed. This will be subdirectory of a tmpfs
target_mount_point: PathBuf,
/// Flag to indicate that the Storage should be watched. Storage will be watched until
/// the source becomes too large, either in number of files (>8) or total size (>1MB).
watch: bool,
/// The list of files to watch from the source mount point and updated in the target one.
watched_files: HashMap<PathBuf, SystemTime>,
}
impl Drop for Storage {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.target_mount_point);
}
}
impl Storage {
async fn new(storage: protos::Storage) -> Result<Storage> {
let entry = Storage {
source_mount_point: PathBuf::from(&storage.source),
target_mount_point: PathBuf::from(&storage.mount_point),
watch: true,
watched_files: HashMap::new(),
};
Ok(entry)
}
async fn update_target(&self, logger: &Logger, source_path: impl AsRef<Path>) -> Result<()> {
let source_file_path = source_path.as_ref();
let dest_file_path = if self.source_mount_point.is_file() {
// Simple file to file copy
// Assume target mount is a file path
self.target_mount_point.clone()
} else {
let dest_file_path = self.make_target_path(&source_file_path)?;
if let Some(path) = dest_file_path.parent() {
debug!(logger, "Creating destination directory: {}", path.display());
fs::create_dir_all(path)
.await
.with_context(|| format!("Unable to mkdir all for {}", path.display()))?;
}
dest_file_path
};
debug!(
logger,
"Copy from {} to {}",
source_file_path.display(),
dest_file_path.display()
);
fs::copy(&source_file_path, &dest_file_path)
.await
.with_context(|| {
format!(
"Copy from {} to {} failed",
source_file_path.display(),
dest_file_path.display()
)
})?;
Ok(())
}
async fn scan(&mut self, logger: &Logger) -> Result<usize> {
debug!(logger, "Scanning for changes");
let mut remove_list = Vec::new();
let mut updated_files: Vec<PathBuf> = Vec::new();
// Remove deleted files for tracking list
self.watched_files.retain(|st, _| {
if st.exists() {
true
} else {
remove_list.push(st.to_path_buf());
false
}
});
// Delete from target
for path in remove_list {
// File has been deleted, remove it from target mount
let target = self.make_target_path(path)?;
debug!(logger, "Removing file from mount: {}", target.display());
let _ = fs::remove_file(target).await;
}
// Scan new & changed files
self.scan_path(
logger,
self.source_mount_point.clone().as_path(),
&mut updated_files,
)
.await
.with_context(|| "Scan path failed")?;
// Update identified files:
for path in &updated_files {
self.update_target(logger, path.as_path()).await?;
}
Ok(updated_files.len())
}
#[async_recursion]
async fn scan_path(
&mut self,
logger: &Logger,
path: &Path,
update_list: &mut Vec<PathBuf>,
) -> Result<u64> {
let mut size: u64 = 0;
debug!(logger, "Scanning path: {}", path.display());
if path.is_file() {
let metadata = path
.metadata()
.with_context(|| format!("Failed to query metadata for: {}", path.display()))?;
let modified = metadata
.modified()
.with_context(|| format!("Failed to get modified date for: {}", path.display()))?;
size += metadata.len();
ensure!(
self.watched_files.len() <= MAX_ENTRIES_PER_STORAGE,
"Too many file system entries to watch (must be < {})",
MAX_ENTRIES_PER_STORAGE
);
// Insert will return old entry if any
if let Some(old_st) = self.watched_files.insert(path.to_path_buf(), modified) {
if modified > old_st {
update_list.push(PathBuf::from(&path))
}
} else {
// Storage just added, copy to target
debug!(logger, "New entry: {}", path.display());
update_list.push(PathBuf::from(&path))
}
} else {
// Scan dir recursively
let mut entries = fs::read_dir(path)
.await
.with_context(|| format!("Failed to read dir: {}", path.display()))?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let res_size = self
.scan_path(logger, path.as_path(), update_list)
.await
.with_context(|| format!("Unable to scan inner path: {}", path.display()))?;
size += res_size;
}
}
ensure!(
size <= MAX_SIZE_PER_WATCHABLE_MOUNT,
"Too many file system entries to watch (must be < {})",
MAX_SIZE_PER_WATCHABLE_MOUNT,
);
Ok(size)
}
fn make_target_path(&self, source_file_path: impl AsRef<Path>) -> Result<PathBuf> {
let relative_path = source_file_path
.as_ref()
.strip_prefix(&self.source_mount_point)
.with_context(|| {
format!(
"Failed to strip prefix: {} - {}",
source_file_path.as_ref().display().to_string(),
&self.source_mount_point.display()
)
})?;
let dest_file_path = Path::new(&self.target_mount_point).join(relative_path);
Ok(dest_file_path)
}
}
#[derive(Default, Debug)]
struct SandboxStorages(Vec<Storage>);
impl SandboxStorages {
async fn add(
&mut self,
list: impl IntoIterator<Item = protos::Storage>,
logger: &Logger,
) -> Result<()> {
for storage in list.into_iter() {
let entry = Storage::new(storage)
.await
.with_context(|| "Failed to add storage")?;
self.0.push(entry);
}
// Perform initial copy
self.check(logger)
.await
.with_context(|| "Failed to perform initial check")?;
Ok(())
}
async fn check(&mut self, logger: &Logger) -> Result<()> {
for entry in self.0.iter_mut().filter(|e| e.watch) {
if let Err(e) = entry.scan(logger).await {
// If an error was observed, we will stop treating this Storage as being watchable, and
// instead clean up the target-mount files on the tmpfs and bind mount the source_mount_point
// to target_mount_point.
error!(logger, "error observed when watching: {:?}", e);
entry.watch = false;
// Remove destination contents, but not the directory itself, since this is
// assumed to be bind-mounted into a container. If source/mount is a file, no need to cleanup
if entry.target_mount_point.as_path().is_dir() {
for dir_entry in std::fs::read_dir(entry.target_mount_point.as_path())? {
let dir_entry = dir_entry?;
let path = dir_entry.path();
if dir_entry.file_type()?.is_dir() {
tokio::fs::remove_dir_all(path).await?;
} else {
tokio::fs::remove_file(path).await?;
}
}
}
// - Create bind mount from source to destination
BareMount::new(
entry.source_mount_point.to_str().unwrap(),
entry.target_mount_point.to_str().unwrap(),
"bind",
MsFlags::MS_BIND,
"bind",
logger,
)
.mount()?;
}
}
Ok(())
}
}
/// Handles watchable mounts. The watcher will manage one or more mounts for one or more containers. For each
/// mount that is added, the watcher will maintain a list of files to monitor, and periodically checks for new,
/// removed or changed (modified date) files. When a change is identified, the watcher will either copy the new
/// or updated file to a target mount point, or remove the removed file from the target mount point. All WatchableStorage
/// target mount points are expected to reside within a single tmpfs, whose root is created by the BindWatcher.
///
/// This is a temporary workaround to handle config map updates until we get inotify on 9p/virtio-fs.
/// More context on this:
/// - https://github.com/kata-containers/runtime/issues/1505
/// - https://github.com/kata-containers/kata-containers/issues/1879
#[derive(Debug, Default)]
pub struct BindWatcher {
/// Container ID -> Vec of watched entries
sandbox_storages: Arc<Mutex<HashMap<String, SandboxStorages>>>,
watch_thread: Option<task::JoinHandle<()>>,
}
impl Drop for BindWatcher {
fn drop(&mut self) {
self.cleanup();
}
}
impl BindWatcher {
pub fn new() -> BindWatcher {
Default::default()
}
pub async fn add_container(
&mut self,
id: String,
mounts: impl IntoIterator<Item = protos::Storage>,
logger: &Logger,
) -> Result<()> {
if self.watch_thread.is_none() {
// Virtio-fs shared path is RO by default, so we back the target-mounts by tmpfs.
self.mount(logger).await?;
// Spawn background thread to monitor changes
self.watch_thread = Some(Self::spawn_watcher(
logger.clone(),
Arc::clone(&self.sandbox_storages),
WATCH_INTERVAL_SECS,
));
}
self.sandbox_storages
.lock()
.await
.entry(id)
.or_insert_with(SandboxStorages::default)
.add(mounts, logger)
.await
.with_context(|| "Failed to add container")?;
Ok(())
}
pub async fn remove_container(&self, id: &str) {
self.sandbox_storages.lock().await.remove(id);
}
fn spawn_watcher(
logger: Logger,
sandbox_storages: Arc<Mutex<HashMap<String, SandboxStorages>>>,
interval_secs: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(interval_secs));
loop {
interval.tick().await;
debug!(&logger, "Looking for changed files");
for (_, entries) in sandbox_storages.lock().await.iter_mut() {
if let Err(err) = entries.check(&logger).await {
// We don't fail background loop, but rather log error instead.
error!(logger, "Check failed: {}", err);
}
}
}
})
}
async fn mount(&self, logger: &Logger) -> Result<()> {
fs::create_dir_all(WATCH_MOUNT_POINT_PATH).await?;
BareMount::new(
"tmpfs",
WATCH_MOUNT_POINT_PATH,
"tmpfs",
MsFlags::empty(),
"",
logger,
)
.mount()?;
Ok(())
}
fn cleanup(&mut self) {
if let Some(handle) = self.watch_thread.take() {
// Stop our background thread
handle.abort();
}
let _ = umount(WATCH_MOUNT_POINT_PATH);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mount::is_mounted;
use crate::skip_if_not_root;
use std::fs;
use std::thread;
#[tokio::test]
async fn watch_entries() {
skip_if_not_root!();
// If there's an error with an entry, let's make sure it is removed, and that the
// mount-destination behaves like a standard bind-mount.
// Create an entries vector with three storage objects: storage, storage1, storage2.
// We'll first verify each are evaluated correctly, then increase the first entry's contents
// so it fails mount size check (>1MB) (test handling for failure on mount that is a directory).
// We'll then similarly cause failure with storage2 (test handling for failure on mount that is
// a single file). We'll then verify that storage1 continues to be watchable.
let source_dir = tempfile::tempdir().unwrap();
let dest_dir = tempfile::tempdir().unwrap();
let storage = protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
};
std::fs::File::create(source_dir.path().join("small.txt"))
.unwrap()
.set_len(10)
.unwrap();
let source_dir1 = tempfile::tempdir().unwrap();
let dest_dir1 = tempfile::tempdir().unwrap();
let storage1 = protos::Storage {
source: source_dir1.path().display().to_string(),
mount_point: dest_dir1.path().display().to_string(),
..Default::default()
};
std::fs::File::create(source_dir1.path().join("large.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
.unwrap();
// And finally, create a single file mount:
let source_dir2 = tempfile::tempdir().unwrap();
let dest_dir2 = tempfile::tempdir().unwrap();
let source_path = source_dir2.path().join("mounted-file");
let dest_path = dest_dir2.path().join("mounted-file");
let mounted_file = std::fs::File::create(&source_path).unwrap();
mounted_file.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT).unwrap();
let storage2 = protos::Storage {
source: source_path.display().to_string(),
mount_point: dest_path.display().to_string(),
..Default::default()
};
let logger = slog::Logger::root(slog::Discard, o!());
let mut entries = SandboxStorages {
..Default::default()
};
entries
.add(std::iter::once(storage), &logger)
.await
.unwrap();
entries
.add(std::iter::once(storage1), &logger)
.await
.unwrap();
entries
.add(std::iter::once(storage2), &logger)
.await
.unwrap();
// Check that there are three entries, and that the
// destination (mount point) matches what we expect for
// the first:
assert!(entries.check(&logger).await.is_ok());
assert_eq!(entries.0.len(), 3);
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 1);
// Add a second file which will trip file size check:
std::fs::File::create(source_dir.path().join("big.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
.unwrap();
assert!(entries.check(&logger).await.is_ok());
// Verify Storage 0 is no longer going to be watched:
assert!(!entries.0[0].watch);
// Verify that the directory has two entries:
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 2);
// Verify that the directory is a bind mount. Add an entry without calling check,
// and verify that the destination directory includes these files in the case of
// mount that is no longer being watched (storage), but not within the still-being
// watched (storage1):
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
fs::write(source_dir1.path().join("2.txt"), "updated").unwrap();
assert_eq!(std::fs::read_dir(source_dir.path()).unwrap().count(), 3);
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 3);
assert_eq!(std::fs::read_dir(source_dir1.path()).unwrap().count(), 2);
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 1);
// Verify that storage1 is still working. After running check, we expect that the number
// of entries to increment
assert!(entries.check(&logger).await.is_ok());
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 2);
// Break storage2 by increasing the file size
mounted_file
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 10)
.unwrap();
assert!(entries.check(&logger).await.is_ok());
// Verify Storage 2 is no longer going to be watched:
assert!(!entries.0[2].watch);
// Verify bind mount is working -- let's write to the file and observe output:
fs::write(&source_path, "updated").unwrap();
assert_eq!(fs::read_to_string(&source_path).unwrap(), "updated");
}
#[tokio::test]
async fn watch_directory_too_large() {
let source_dir = tempfile::tempdir().unwrap();
let dest_dir = tempfile::tempdir().unwrap();
let mut entry = Storage::new(protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
})
.await
.unwrap();
let logger = slog::Logger::root(slog::Discard, o!());
// Create a file that is too large:
std::fs::File::create(source_dir.path().join("big.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 1)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
std::fs::File::create(source_dir.path().join("big.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT - 1)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_ok());
std::fs::File::create(source_dir.path().join("too-big.txt"))
.unwrap()
.set_len(2)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
fs::remove_file(source_dir.path().join("too-big.txt")).unwrap();
// Up to eight files should be okay:
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
fs::write(source_dir.path().join("2.txt"), "updated").unwrap();
fs::write(source_dir.path().join("3.txt"), "updated").unwrap();
fs::write(source_dir.path().join("4.txt"), "updated").unwrap();
fs::write(source_dir.path().join("5.txt"), "updated").unwrap();
fs::write(source_dir.path().join("6.txt"), "updated").unwrap();
fs::write(source_dir.path().join("7.txt"), "updated").unwrap();
fs::write(source_dir.path().join("8.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 8);
// Nine files is too many:
fs::write(source_dir.path().join("9.txt"), "updated").unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
}
#[tokio::test]
async fn watch_directory() {
// Prepare source directory:
// ./tmp/1.txt
// ./tmp/A/B/2.txt
let source_dir = tempfile::tempdir().unwrap();
fs::write(source_dir.path().join("1.txt"), "one").unwrap();
fs::create_dir_all(source_dir.path().join("A/B")).unwrap();
fs::write(source_dir.path().join("A/B/1.txt"), "two").unwrap();
let dest_dir = tempfile::tempdir().unwrap();
let mut entry = Storage::new(protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
})
.await
.unwrap();
let logger = slog::Logger::root(slog::Discard, o!());
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
// Should copy no files since nothing is changed since last check
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
// Should copy 1 file
thread::sleep(Duration::from_secs(1));
fs::write(source_dir.path().join("A/B/1.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
assert_eq!(
fs::read_to_string(dest_dir.path().join("A/B/1.txt")).unwrap(),
"updated"
);
// Should copy no new files after copy happened
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
// Update another file
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
}
#[tokio::test]
async fn watch_file() {
let source_dir = tempfile::tempdir().unwrap();
let source_file = source_dir.path().join("1.txt");
fs::write(&source_file, "one").unwrap();
let dest_dir = tempfile::tempdir().unwrap();
let dest_file = dest_dir.path().join("1.txt");
let mut entry = Storage::new(protos::Storage {
source: source_file.display().to_string(),
mount_point: dest_file.display().to_string(),
..Default::default()
})
.await
.unwrap();
let logger = slog::Logger::root(slog::Discard, o!());
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
thread::sleep(Duration::from_secs(1));
fs::write(&source_file, "two").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
assert_eq!(fs::read_to_string(&dest_file).unwrap(), "two");
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
}
#[tokio::test]
async fn delete_file() {
let source_dir = tempfile::tempdir().unwrap();
let source_file = source_dir.path().join("1.txt");
fs::write(&source_file, "one").unwrap();
let dest_dir = tempfile::tempdir().unwrap();
let target_file = dest_dir.path().join("1.txt");
let mut entry = Storage::new(protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
})
.await
.unwrap();
let logger = slog::Logger::root(slog::Discard, o!());
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
assert_eq!(entry.watched_files.len(), 1);
assert!(target_file.exists());
assert!(entry.watched_files.contains_key(&source_file));
// Remove source file
fs::remove_file(&source_file).unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
assert_eq!(entry.watched_files.len(), 0);
assert!(!target_file.exists());
}
#[tokio::test]
async fn make_target_path() {
let source_dir = tempfile::tempdir().unwrap();
let target_dir = tempfile::tempdir().unwrap();
let source_dir = source_dir.path();
let target_dir = target_dir.path();
let entry = Storage::new(protos::Storage {
source: source_dir.display().to_string(),
mount_point: target_dir.display().to_string(),
..Default::default()
})
.await
.unwrap();
assert_eq!(
entry.make_target_path(source_dir.join("1.txt")).unwrap(),
target_dir.join("1.txt")
);
assert_eq!(
entry
.make_target_path(source_dir.join("a/b/2.txt"))
.unwrap(),
target_dir.join("a/b/2.txt")
);
}
#[tokio::test]
async fn create_tmpfs() {
skip_if_not_root!();
let logger = slog::Logger::root(slog::Discard, o!());
let mut watcher = BindWatcher::default();
watcher.mount(&logger).await.unwrap();
assert!(is_mounted(WATCH_MOUNT_POINT_PATH).unwrap());
watcher.cleanup();
assert!(!is_mounted(WATCH_MOUNT_POINT_PATH).unwrap());
}
#[tokio::test]
async fn spawn_thread() {
skip_if_not_root!();
let source_dir = tempfile::tempdir().unwrap();
fs::write(source_dir.path().join("1.txt"), "one").unwrap();
let dest_dir = tempfile::tempdir().unwrap();
let storage = protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
};
let logger = slog::Logger::root(slog::Discard, o!());
let mut watcher = BindWatcher::default();
watcher
.add_container("test".into(), std::iter::once(storage), &logger)
.await
.unwrap();
thread::sleep(Duration::from_secs(WATCH_INTERVAL_SECS));
let out = fs::read_to_string(dest_dir.path().join("1.txt")).unwrap();
assert_eq!(out, "one");
}
}

View File

@ -506,9 +506,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.94"
version = "0.2.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18794a8ad5b29321f790b55d93dfba91e125cb1a9edbd4f8e3150acc771c1a5e"
checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6"
[[package]]
name = "log"
@ -536,6 +536,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
name = "memoffset"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
@ -624,6 +633,19 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c3728fec49d363a50a8828a190b379a446cc5cf085c06259bbbeb34447e4ec7"
dependencies = [
"bitflags",
"cc",
"cfg-if 1.0.0",
"libc",
"memoffset",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -999,7 +1021,7 @@ dependencies = [
"inotify",
"lazy_static",
"libc",
"nix 0.17.0",
"nix 0.21.0",
"oci",
"path-absolutize",
"protobuf",