|
|
|
@@ -3,26 +3,28 @@
|
|
|
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
#![allow(clippy::unknown_clippy_lints)]
|
|
|
|
|
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::SystemTime;
|
|
|
|
|
|
|
|
|
|
use anyhow::{ensure, Context, Result};
|
|
|
|
|
use async_recursion::async_recursion;
|
|
|
|
|
use nix::mount::{umount, MsFlags};
|
|
|
|
|
use slog::{debug, error, info, warn, Logger};
|
|
|
|
|
use thiserror::Error;
|
|
|
|
|
use tokio::fs;
|
|
|
|
|
use tokio::sync::Mutex;
|
|
|
|
|
use tokio::task;
|
|
|
|
|
use tokio::time::{self, Duration};
|
|
|
|
|
|
|
|
|
|
use anyhow::{ensure, Context, Result};
|
|
|
|
|
use async_recursion::async_recursion;
|
|
|
|
|
use nix::mount::{umount, MsFlags};
|
|
|
|
|
use slog::{debug, error, Logger};
|
|
|
|
|
|
|
|
|
|
use crate::mount::BareMount;
|
|
|
|
|
use crate::protocols::agent as protos;
|
|
|
|
|
|
|
|
|
|
/// The maximum number of file system entries agent will watch for each mount.
|
|
|
|
|
const MAX_ENTRIES_PER_STORAGE: usize = 8;
|
|
|
|
|
const MAX_ENTRIES_PER_STORAGE: usize = 16;
|
|
|
|
|
|
|
|
|
|
/// The maximum size of a watchable mount in bytes.
|
|
|
|
|
const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024;
|
|
|
|
@@ -44,15 +46,35 @@ struct Storage {
|
|
|
|
|
target_mount_point: PathBuf,
|
|
|
|
|
|
|
|
|
|
/// Flag to indicate that the Storage should be watched. Storage will be watched until
|
|
|
|
|
/// the source becomes too large, either in number of files (>8) or total size (>1MB).
|
|
|
|
|
/// the source becomes too large, either in number of files (>16) or total size (>1MB).
|
|
|
|
|
watch: bool,
|
|
|
|
|
|
|
|
|
|
/// The list of files to watch from the source mount point and updated in the target one.
|
|
|
|
|
watched_files: HashMap<PathBuf, SystemTime>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Error, Debug)]
|
|
|
|
|
pub enum WatcherError {
|
|
|
|
|
#[error(
|
|
|
|
|
"Too many file system entries within to watch within: {mnt} ({count} must be < {})",
|
|
|
|
|
MAX_ENTRIES_PER_STORAGE
|
|
|
|
|
)]
|
|
|
|
|
MountTooManyFiles { count: usize, mnt: String },
|
|
|
|
|
|
|
|
|
|
#[error(
|
|
|
|
|
"Mount too large to watch: {mnt} ({size} must be < {})",
|
|
|
|
|
MAX_SIZE_PER_WATCHABLE_MOUNT
|
|
|
|
|
)]
|
|
|
|
|
MountTooLarge { size: u64, mnt: String },
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Drop for Storage {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
if !&self.watch {
|
|
|
|
|
// If we weren't watching this storage entry, it means that a bind mount
|
|
|
|
|
// was created.
|
|
|
|
|
let _ = umount(&self.target_mount_point);
|
|
|
|
|
}
|
|
|
|
|
let _ = std::fs::remove_dir_all(&self.target_mount_point);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -65,7 +87,6 @@ impl Storage {
|
|
|
|
|
watch: true,
|
|
|
|
|
watched_files: HashMap::new(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(entry)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -143,7 +164,9 @@ impl Storage {
|
|
|
|
|
|
|
|
|
|
// Update identified files:
|
|
|
|
|
for path in &updated_files {
|
|
|
|
|
self.update_target(logger, path.as_path()).await?;
|
|
|
|
|
if let Err(e) = self.update_target(logger, path.as_path()).await {
|
|
|
|
|
error!(logger, "failure in update_target: {:?}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(updated_files.len())
|
|
|
|
@@ -172,8 +195,10 @@ impl Storage {
|
|
|
|
|
|
|
|
|
|
ensure!(
|
|
|
|
|
self.watched_files.len() <= MAX_ENTRIES_PER_STORAGE,
|
|
|
|
|
"Too many file system entries to watch (must be < {})",
|
|
|
|
|
MAX_ENTRIES_PER_STORAGE
|
|
|
|
|
WatcherError::MountTooManyFiles {
|
|
|
|
|
count: self.watched_files.len(),
|
|
|
|
|
mnt: self.source_mount_point.display().to_string()
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Insert will return old entry if any
|
|
|
|
@@ -201,10 +226,13 @@ impl Storage {
|
|
|
|
|
size += res_size;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ensure!(
|
|
|
|
|
size <= MAX_SIZE_PER_WATCHABLE_MOUNT,
|
|
|
|
|
"Too many file system entries to watch (must be < {})",
|
|
|
|
|
MAX_SIZE_PER_WATCHABLE_MOUNT,
|
|
|
|
|
WatcherError::MountTooLarge {
|
|
|
|
|
size,
|
|
|
|
|
mnt: self.source_mount_point.display().to_string()
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
Ok(size)
|
|
|
|
@@ -255,38 +283,59 @@ impl SandboxStorages {
|
|
|
|
|
async fn check(&mut self, logger: &Logger) -> Result<()> {
|
|
|
|
|
for entry in self.0.iter_mut().filter(|e| e.watch) {
|
|
|
|
|
if let Err(e) = entry.scan(logger).await {
|
|
|
|
|
// If an error was observed, we will stop treating this Storage as being watchable, and
|
|
|
|
|
// instead clean up the target-mount files on the tmpfs and bind mount the source_mount_point
|
|
|
|
|
// to target_mount_point.
|
|
|
|
|
error!(logger, "error observed when watching: {:?}", e);
|
|
|
|
|
entry.watch = false;
|
|
|
|
|
match e.downcast_ref::<WatcherError>() {
|
|
|
|
|
Some(WatcherError::MountTooLarge { .. })
|
|
|
|
|
| Some(WatcherError::MountTooManyFiles { .. }) => {
|
|
|
|
|
//
|
|
|
|
|
// If the mount we were watching is too large (bytes), or contains too many unique files,
|
|
|
|
|
// we no longer want to watch. Instead, we'll attempt to create a bind mount and mark this storage
|
|
|
|
|
// as non-watchable. if there's an error in creating bind mount, we'll continue watching.
|
|
|
|
|
//
|
|
|
|
|
// Ensure the target mount point exists:
|
|
|
|
|
if !entry.target_mount_point.as_path().exists() {
|
|
|
|
|
if entry.source_mount_point.as_path().is_dir() {
|
|
|
|
|
fs::create_dir_all(entry.target_mount_point.as_path())
|
|
|
|
|
.await
|
|
|
|
|
.with_context(|| {
|
|
|
|
|
format!(
|
|
|
|
|
"create dir for bindmount {:?}",
|
|
|
|
|
entry.target_mount_point.as_path()
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
} else {
|
|
|
|
|
fs::File::create(entry.target_mount_point.as_path())
|
|
|
|
|
.await
|
|
|
|
|
.with_context(|| {
|
|
|
|
|
format!(
|
|
|
|
|
"create file {:?}",
|
|
|
|
|
entry.target_mount_point.as_path()
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove destination contents, but not the directory itself, since this is
|
|
|
|
|
// assumed to be bind-mounted into a container. If source/mount is a file, no need to cleanup
|
|
|
|
|
if entry.target_mount_point.as_path().is_dir() {
|
|
|
|
|
for dir_entry in std::fs::read_dir(entry.target_mount_point.as_path())? {
|
|
|
|
|
let dir_entry = dir_entry?;
|
|
|
|
|
let path = dir_entry.path();
|
|
|
|
|
if dir_entry.file_type()?.is_dir() {
|
|
|
|
|
tokio::fs::remove_dir_all(path).await?;
|
|
|
|
|
} else {
|
|
|
|
|
tokio::fs::remove_file(path).await?;
|
|
|
|
|
match BareMount::new(
|
|
|
|
|
entry.source_mount_point.to_str().unwrap(),
|
|
|
|
|
entry.target_mount_point.to_str().unwrap(),
|
|
|
|
|
"bind",
|
|
|
|
|
MsFlags::MS_BIND,
|
|
|
|
|
"bind",
|
|
|
|
|
logger,
|
|
|
|
|
)
|
|
|
|
|
.mount()
|
|
|
|
|
{
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
entry.watch = false;
|
|
|
|
|
info!(logger, "watchable mount replaced with bind mount")
|
|
|
|
|
}
|
|
|
|
|
Err(e) => error!(logger, "unable to replace watchable: {:?}", e),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ => warn!(logger, "scan error: {:?}", e),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// - Create bind mount from source to destination
|
|
|
|
|
BareMount::new(
|
|
|
|
|
entry.source_mount_point.to_str().unwrap(),
|
|
|
|
|
entry.target_mount_point.to_str().unwrap(),
|
|
|
|
|
"bind",
|
|
|
|
|
MsFlags::MS_BIND,
|
|
|
|
|
"bind",
|
|
|
|
|
logger,
|
|
|
|
|
)
|
|
|
|
|
.mount()?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -368,7 +417,7 @@ impl BindWatcher {
|
|
|
|
|
for (_, entries) in sandbox_storages.lock().await.iter_mut() {
|
|
|
|
|
if let Err(err) = entries.check(&logger).await {
|
|
|
|
|
// We don't fail background loop, but rather log error instead.
|
|
|
|
|
error!(logger, "Check failed: {}", err);
|
|
|
|
|
warn!(logger, "Check failed: {}", err);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -409,57 +458,74 @@ mod tests {
|
|
|
|
|
use std::fs;
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
async fn create_test_storage(dir: &Path, id: &str) -> Result<(protos::Storage, PathBuf)> {
|
|
|
|
|
let src_path = dir.join(format!("src{}", id));
|
|
|
|
|
let src_filename = src_path.to_str().expect("failed to create src filename");
|
|
|
|
|
let dest_path = dir.join(format!("dest{}", id));
|
|
|
|
|
let dest_filename = dest_path.to_str().expect("failed to create dest filename");
|
|
|
|
|
|
|
|
|
|
std::fs::create_dir_all(src_filename).expect("failed to create path");
|
|
|
|
|
|
|
|
|
|
let storage = protos::Storage {
|
|
|
|
|
source: src_filename.to_string(),
|
|
|
|
|
mount_point: dest_filename.to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok((storage, src_path))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn watch_entries() {
|
|
|
|
|
async fn test_watch_entries() {
|
|
|
|
|
skip_if_not_root!();
|
|
|
|
|
|
|
|
|
|
// If there's an error with an entry, let's make sure it is removed, and that the
|
|
|
|
|
// mount-destination behaves like a standard bind-mount.
|
|
|
|
|
|
|
|
|
|
// Create an entries vector with three storage objects: storage, storage1, storage2.
|
|
|
|
|
// We'll first verify each are evaluated correctly, then increase the first entry's contents
|
|
|
|
|
// so it fails mount size check (>1MB) (test handling for failure on mount that is a directory).
|
|
|
|
|
// We'll then similarly cause failure with storage2 (test handling for failure on mount that is
|
|
|
|
|
// a single file). We'll then verify that storage1 continues to be watchable.
|
|
|
|
|
let source_dir = tempfile::tempdir().unwrap();
|
|
|
|
|
let dest_dir = tempfile::tempdir().unwrap();
|
|
|
|
|
// Create an entries vector with four storage objects: storage0,1,2,3.
|
|
|
|
|
// 0th we'll have fail due to too many files before running a check
|
|
|
|
|
// 1st will just have a single medium sized file, we'll keep it watchable throughout
|
|
|
|
|
// 2nd will have a large file (<1MB), but we'll later make larger to make unwatchable
|
|
|
|
|
// 3rd will have several files, and later we'll make unwatchable by having too many files.
|
|
|
|
|
// We'll run check a couple of times to verify watchable is always watchable, and unwatchable bind mounts
|
|
|
|
|
// match our expectations.
|
|
|
|
|
let dir = tempfile::tempdir().expect("failed to create tempdir");
|
|
|
|
|
|
|
|
|
|
let storage = protos::Storage {
|
|
|
|
|
source: source_dir.path().display().to_string(),
|
|
|
|
|
mount_point: dest_dir.path().display().to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
std::fs::File::create(source_dir.path().join("small.txt"))
|
|
|
|
|
let (storage0, src0_path) = create_test_storage(dir.path(), "1")
|
|
|
|
|
.await
|
|
|
|
|
.expect("failed to create storage");
|
|
|
|
|
let (storage1, src1_path) = create_test_storage(dir.path(), "2")
|
|
|
|
|
.await
|
|
|
|
|
.expect("failed to create storage");
|
|
|
|
|
let (storage2, src2_path) = create_test_storage(dir.path(), "3")
|
|
|
|
|
.await
|
|
|
|
|
.expect("failed to create storage");
|
|
|
|
|
let (storage3, src3_path) = create_test_storage(dir.path(), "4")
|
|
|
|
|
.await
|
|
|
|
|
.expect("failed to create storage");
|
|
|
|
|
|
|
|
|
|
// setup storage0: too many files
|
|
|
|
|
for i in 1..21 {
|
|
|
|
|
fs::write(src0_path.join(format!("{}.txt", i)), "original").unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// setup storage1: two small files
|
|
|
|
|
std::fs::File::create(src1_path.join("small.txt"))
|
|
|
|
|
.unwrap()
|
|
|
|
|
.set_len(10)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(src1_path.join("foo.txt"), "original").unwrap();
|
|
|
|
|
|
|
|
|
|
let source_dir1 = tempfile::tempdir().unwrap();
|
|
|
|
|
let dest_dir1 = tempfile::tempdir().unwrap();
|
|
|
|
|
let storage1 = protos::Storage {
|
|
|
|
|
source: source_dir1.path().display().to_string(),
|
|
|
|
|
mount_point: dest_dir1.path().display().to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
std::fs::File::create(source_dir1.path().join("large.txt"))
|
|
|
|
|
// setup storage2: large file, but still watchable
|
|
|
|
|
std::fs::File::create(src2_path.join("large.txt"))
|
|
|
|
|
.unwrap()
|
|
|
|
|
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
// And finally, create a single file mount:
|
|
|
|
|
let source_dir2 = tempfile::tempdir().unwrap();
|
|
|
|
|
let dest_dir2 = tempfile::tempdir().unwrap();
|
|
|
|
|
|
|
|
|
|
let source_path = source_dir2.path().join("mounted-file");
|
|
|
|
|
let dest_path = dest_dir2.path().join("mounted-file");
|
|
|
|
|
let mounted_file = std::fs::File::create(&source_path).unwrap();
|
|
|
|
|
mounted_file.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT).unwrap();
|
|
|
|
|
|
|
|
|
|
let storage2 = protos::Storage {
|
|
|
|
|
source: source_path.display().to_string(),
|
|
|
|
|
mount_point: dest_path.display().to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
// setup storage3: many files, but still watchable
|
|
|
|
|
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
|
|
|
|
|
fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let logger = slog::Logger::root(slog::Discard, o!());
|
|
|
|
|
|
|
|
|
@@ -468,69 +534,146 @@ mod tests {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
entries
|
|
|
|
|
.add(std::iter::once(storage), &logger)
|
|
|
|
|
.add(std::iter::once(storage0), &logger)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
entries
|
|
|
|
|
.add(std::iter::once(storage1), &logger)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
entries
|
|
|
|
|
.add(std::iter::once(storage2), &logger)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
// Check that there are three entries, and that the
|
|
|
|
|
// destination (mount point) matches what we expect for
|
|
|
|
|
// the first:
|
|
|
|
|
assert!(entries.check(&logger).await.is_ok());
|
|
|
|
|
assert_eq!(entries.0.len(), 3);
|
|
|
|
|
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 1);
|
|
|
|
|
|
|
|
|
|
// Add a second file which will trip file size check:
|
|
|
|
|
std::fs::File::create(source_dir.path().join("big.txt"))
|
|
|
|
|
.unwrap()
|
|
|
|
|
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
|
|
|
|
|
entries
|
|
|
|
|
.add(std::iter::once(storage3), &logger)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
assert!(entries.check(&logger).await.is_ok());
|
|
|
|
|
// Check that there are four entries
|
|
|
|
|
assert_eq!(entries.0.len(), 4);
|
|
|
|
|
|
|
|
|
|
// Verify Storage 0 is no longer going to be watched:
|
|
|
|
|
//verify that storage 0 is no longer going to be watched, but 1,2,3 are
|
|
|
|
|
assert!(!entries.0[0].watch);
|
|
|
|
|
assert!(entries.0[1].watch);
|
|
|
|
|
assert!(entries.0[2].watch);
|
|
|
|
|
assert!(entries.0[3].watch);
|
|
|
|
|
|
|
|
|
|
// Verify that the directory has two entries:
|
|
|
|
|
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 2);
|
|
|
|
|
assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 8);
|
|
|
|
|
|
|
|
|
|
// Verify that the directory is a bind mount. Add an entry without calling check,
|
|
|
|
|
// and verify that the destination directory includes these files in the case of
|
|
|
|
|
// mount that is no longer being watched (storage), but not within the still-being
|
|
|
|
|
// watched (storage1):
|
|
|
|
|
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir1.path().join("2.txt"), "updated").unwrap();
|
|
|
|
|
//verify target mount points contain expected number of entries:
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
20
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
2
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[2].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
1
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
MAX_ENTRIES_PER_STORAGE
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assert_eq!(std::fs::read_dir(source_dir.path()).unwrap().count(), 3);
|
|
|
|
|
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 3);
|
|
|
|
|
assert_eq!(std::fs::read_dir(source_dir1.path()).unwrap().count(), 2);
|
|
|
|
|
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 1);
|
|
|
|
|
// Add two files to storage 0, verify it is updated without needing to run check:
|
|
|
|
|
fs::write(src0_path.join("1.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(src0_path.join("foo.txt"), "new").unwrap();
|
|
|
|
|
fs::write(src0_path.join("bar.txt"), "new").unwrap();
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
22
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
fs::read_to_string(&entries.0[0].target_mount_point.as_path().join("1.txt")).unwrap(),
|
|
|
|
|
"updated"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Verify that storage1 is still working. After running check, we expect that the number
|
|
|
|
|
// of entries to increment
|
|
|
|
|
assert!(entries.check(&logger).await.is_ok());
|
|
|
|
|
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 2);
|
|
|
|
|
//
|
|
|
|
|
// Prepare for second check: update mount sources
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// Break storage2 by increasing the file size
|
|
|
|
|
mounted_file
|
|
|
|
|
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 10)
|
|
|
|
|
// source 3 will become unwatchable
|
|
|
|
|
fs::write(src3_path.join("foo.txt"), "updated").unwrap();
|
|
|
|
|
|
|
|
|
|
// source 2 will become unwatchable:
|
|
|
|
|
std::fs::File::create(src2_path.join("small.txt"))
|
|
|
|
|
.unwrap()
|
|
|
|
|
.set_len(10)
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert!(entries.check(&logger).await.is_ok());
|
|
|
|
|
// Verify Storage 2 is no longer going to be watched:
|
|
|
|
|
assert!(!entries.0[2].watch);
|
|
|
|
|
|
|
|
|
|
// Verify bind mount is working -- let's write to the file and observe output:
|
|
|
|
|
fs::write(&source_path, "updated").unwrap();
|
|
|
|
|
assert_eq!(fs::read_to_string(&source_path).unwrap(), "updated");
|
|
|
|
|
// source 1: expect just an update
|
|
|
|
|
fs::write(src1_path.join("foo.txt"), "updated").unwrap();
|
|
|
|
|
|
|
|
|
|
assert!(entries.check(&logger).await.is_ok());
|
|
|
|
|
|
|
|
|
|
// verify that only storage 1 is still watchable
|
|
|
|
|
assert!(!entries.0[0].watch);
|
|
|
|
|
assert!(entries.0[1].watch);
|
|
|
|
|
assert!(!entries.0[2].watch);
|
|
|
|
|
assert!(!entries.0[3].watch);
|
|
|
|
|
|
|
|
|
|
// Verify storage 1 was updated, and storage 2,3 are up to date despite no watch
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
22
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
2
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
fs::read_to_string(&entries.0[1].target_mount_point.as_path().join("foo.txt")).unwrap(),
|
|
|
|
|
"updated"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[2].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
2
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
MAX_ENTRIES_PER_STORAGE + 1
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// verify that we can remove files as well, but that it isn't observed until check is run
|
|
|
|
|
// for a watchable mount:
|
|
|
|
|
fs::remove_file(src1_path.join("foo.txt")).unwrap();
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
2
|
|
|
|
|
);
|
|
|
|
|
assert!(entries.check(&logger).await.is_ok());
|
|
|
|
|
assert_eq!(
|
|
|
|
|
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.count(),
|
|
|
|
|
1
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
@@ -553,7 +696,15 @@ mod tests {
|
|
|
|
|
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 1)
|
|
|
|
|
.unwrap();
|
|
|
|
|
thread::sleep(Duration::from_secs(1));
|
|
|
|
|
assert!(entry.scan(&logger).await.is_err());
|
|
|
|
|
|
|
|
|
|
// Expect to receive a MountTooLarge error
|
|
|
|
|
match entry.scan(&logger).await {
|
|
|
|
|
Ok(_) => panic!("expected error"),
|
|
|
|
|
Err(e) => match e.downcast_ref::<WatcherError>() {
|
|
|
|
|
Some(WatcherError::MountTooLarge { .. }) => {}
|
|
|
|
|
_ => panic!("unexpected error"),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
|
|
|
|
|
|
|
|
|
|
std::fs::File::create(source_dir.path().join("big.txt"))
|
|
|
|
@@ -561,6 +712,7 @@ mod tests {
|
|
|
|
|
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT - 1)
|
|
|
|
|
.unwrap();
|
|
|
|
|
thread::sleep(Duration::from_secs(1));
|
|
|
|
|
|
|
|
|
|
assert!(entry.scan(&logger).await.is_ok());
|
|
|
|
|
|
|
|
|
|
std::fs::File::create(source_dir.path().join("too-big.txt"))
|
|
|
|
@@ -568,26 +720,38 @@ mod tests {
|
|
|
|
|
.set_len(2)
|
|
|
|
|
.unwrap();
|
|
|
|
|
thread::sleep(Duration::from_secs(1));
|
|
|
|
|
assert!(entry.scan(&logger).await.is_err());
|
|
|
|
|
|
|
|
|
|
// Expect to receive a MountTooLarge error
|
|
|
|
|
match entry.scan(&logger).await {
|
|
|
|
|
Ok(_) => panic!("expected error"),
|
|
|
|
|
Err(e) => match e.downcast_ref::<WatcherError>() {
|
|
|
|
|
Some(WatcherError::MountTooLarge { .. }) => {}
|
|
|
|
|
_ => panic!("unexpected error"),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
|
|
|
|
|
fs::remove_file(source_dir.path().join("too-big.txt")).unwrap();
|
|
|
|
|
|
|
|
|
|
// Up to eight files should be okay:
|
|
|
|
|
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("2.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("3.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("4.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("5.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("6.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("7.txt"), "updated").unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("8.txt"), "updated").unwrap();
|
|
|
|
|
assert_eq!(entry.scan(&logger).await.unwrap(), 8);
|
|
|
|
|
// Up to 16 files should be okay:
|
|
|
|
|
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
|
|
|
|
|
fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Nine files is too many:
|
|
|
|
|
fs::write(source_dir.path().join("9.txt"), "updated").unwrap();
|
|
|
|
|
assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE);
|
|
|
|
|
|
|
|
|
|
// 17 files is too many:
|
|
|
|
|
fs::write(source_dir.path().join("17.txt"), "updated").unwrap();
|
|
|
|
|
thread::sleep(Duration::from_secs(1));
|
|
|
|
|
assert!(entry.scan(&logger).await.is_err());
|
|
|
|
|
|
|
|
|
|
// Expect to receive a MountTooManyFiles error
|
|
|
|
|
match entry.scan(&logger).await {
|
|
|
|
|
Ok(_) => panic!("expected error"),
|
|
|
|
|
Err(e) => match e.downcast_ref::<WatcherError>() {
|
|
|
|
|
Some(WatcherError::MountTooManyFiles { .. }) => {}
|
|
|
|
|
_ => panic!("unexpected error"),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
@@ -768,4 +932,67 @@ mod tests {
|
|
|
|
|
let out = fs::read_to_string(dest_dir.path().join("1.txt")).unwrap();
|
|
|
|
|
assert_eq!(out, "one");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn verify_container_cleanup_watching() {
|
|
|
|
|
skip_if_not_root!();
|
|
|
|
|
|
|
|
|
|
let source_dir = tempfile::tempdir().unwrap();
|
|
|
|
|
fs::write(source_dir.path().join("1.txt"), "one").unwrap();
|
|
|
|
|
|
|
|
|
|
let dest_dir = tempfile::tempdir().unwrap();
|
|
|
|
|
|
|
|
|
|
let storage = protos::Storage {
|
|
|
|
|
source: source_dir.path().display().to_string(),
|
|
|
|
|
mount_point: dest_dir.path().display().to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let logger = slog::Logger::root(slog::Discard, o!());
|
|
|
|
|
let mut watcher = BindWatcher::default();
|
|
|
|
|
|
|
|
|
|
watcher
|
|
|
|
|
.add_container("test".into(), std::iter::once(storage), &logger)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
thread::sleep(Duration::from_secs(WATCH_INTERVAL_SECS));
|
|
|
|
|
|
|
|
|
|
let out = fs::read_to_string(dest_dir.path().join("1.txt")).unwrap();
|
|
|
|
|
assert!(dest_dir.path().exists());
|
|
|
|
|
assert_eq!(out, "one");
|
|
|
|
|
|
|
|
|
|
watcher.remove_container("test").await;
|
|
|
|
|
|
|
|
|
|
thread::sleep(Duration::from_secs(WATCH_INTERVAL_SECS));
|
|
|
|
|
assert!(!dest_dir.path().exists());
|
|
|
|
|
|
|
|
|
|
for i in 1..21 {
|
|
|
|
|
fs::write(source_dir.path().join(format!("{}.txt", i)), "fluff").unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// verify non-watched storage is cleaned up correctly
|
|
|
|
|
let storage1 = protos::Storage {
|
|
|
|
|
source: source_dir.path().display().to_string(),
|
|
|
|
|
mount_point: dest_dir.path().display().to_string(),
|
|
|
|
|
..Default::default()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
watcher
|
|
|
|
|
.add_container("test".into(), std::iter::once(storage1), &logger)
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
thread::sleep(Duration::from_secs(WATCH_INTERVAL_SECS));
|
|
|
|
|
|
|
|
|
|
assert!(dest_dir.path().exists());
|
|
|
|
|
assert!(is_mounted(dest_dir.path().to_str().unwrap()).unwrap());
|
|
|
|
|
|
|
|
|
|
watcher.remove_container("test").await;
|
|
|
|
|
|
|
|
|
|
thread::sleep(Duration::from_secs(WATCH_INTERVAL_SECS));
|
|
|
|
|
|
|
|
|
|
assert!(!dest_dir.path().exists());
|
|
|
|
|
assert!(!is_mounted(dest_dir.path().to_str().unwrap()).unwrap());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|