agent: watcher: fixes to make more robust

inotify/watchable-mount changes...

- Allow up to 16 files. It isn't that uncommon to have 3 files in a secret.
In Kubernetes, this results in 9 files in the mount (the presented files,
which are symlinks to the latest files, which are symlinks to actual files
which are in a seperate hidden directoy on the mount). Bumping from eight to 16 will
help ensure we can support "most" secret/tokens, and is still a pretty
small number to scan...

- Now we will only replace the watched storage with a bindmount if we observe
that there are too many files or if its too large. Since the scanning/updating is racy,
we should expect that we'll occassionally run into errors (ie, a file
deleted between scan / update). Rather than stopping and making a bind
mount, continue updating, as the changes will be updated the next time
check is called for that entry (every 2 seconds today).

To facilitate the 'oversized' handling, we create specific errors for too large
or too many files, and handle these specific errors when scanning the storage entry.

- When handling an oversided mount, do not remove the prior files -- we'll just
overwrite them with the bindmount. This'll help avoid the files
disappearing from the user, avoid racy cleanup and simplifies the flow.
Similarly, only mark it as a non-watched storage device after the
bindmount is created successfully.

- When creating bind mount, make sure destination exists. If we hadn't
had a successful scan before, this wouldn't exist and the mount would
fail. Update logic and unit test to cover this.

- In several spots, we were returning when there was an error (both in
scan and update). For update case, let's just log an warning and continue;
since the scan/update is racy, we should expect that we'll have
transient errors which should resolve the next time the watcher runs.

Fixes: #2402

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
This commit is contained in:
Eric Ernst 2021-08-03 15:11:03 -07:00
parent 2aa686a0f5
commit 961aaff004
3 changed files with 292 additions and 133 deletions

9
src/agent/Cargo.lock generated
View File

@ -549,6 +549,7 @@ dependencies = [
"slog-scope", "slog-scope",
"slog-stdlog", "slog-stdlog",
"tempfile", "tempfile",
"thiserror",
"tokio", "tokio",
"tokio-vsock", "tokio-vsock",
"tracing", "tracing",
@ -1511,18 +1512,18 @@ dependencies = [
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.24" version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.24" version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745"
dependencies = [ dependencies = [
"proc-macro2 1.0.26", "proc-macro2 1.0.26",
"quote 1.0.9", "quote 1.0.9",

View File

@ -18,6 +18,7 @@ capctl = "0.2.0"
serde_json = "1.0.39" serde_json = "1.0.39"
scan_fmt = "0.2.3" scan_fmt = "0.2.3"
scopeguard = "1.0.0" scopeguard = "1.0.0"
thiserror = "1.0.26"
regex = "1" regex = "1"
# Async helpers # Async helpers

View File

@ -12,17 +12,17 @@ use tokio::fs;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::task; use tokio::task;
use tokio::time::{self, Duration}; use tokio::time::{self, Duration};
use thiserror::Error;
use anyhow::{ensure, Context, Result}; use anyhow::{ensure, Context, Result};
use async_recursion::async_recursion; use async_recursion::async_recursion;
use nix::mount::{umount, MsFlags}; use nix::mount::{umount, MsFlags};
use slog::{debug, error, Logger}; use slog::{debug, error, info, warn, Logger};
use crate::mount::BareMount; use crate::mount::BareMount;
use crate::protocols::agent as protos; use crate::protocols::agent as protos;
/// The maximum number of file system entries agent will watch for each mount. /// The maximum number of file system entries agent will watch for each mount.
const MAX_ENTRIES_PER_STORAGE: usize = 8; const MAX_ENTRIES_PER_STORAGE: usize = 16;
/// The maximum size of a watchable mount in bytes. /// The maximum size of a watchable mount in bytes.
const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024; const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024;
@ -44,13 +44,28 @@ struct Storage {
target_mount_point: PathBuf, target_mount_point: PathBuf,
/// Flag to indicate that the Storage should be watched. Storage will be watched until /// Flag to indicate that the Storage should be watched. Storage will be watched until
/// the source becomes too large, either in number of files (>8) or total size (>1MB). /// the source becomes too large, either in number of files (>16) or total size (>1MB).
watch: bool, watch: bool,
/// The list of files to watch from the source mount point and updated in the target one. /// The list of files to watch from the source mount point and updated in the target one.
watched_files: HashMap<PathBuf, SystemTime>, watched_files: HashMap<PathBuf, SystemTime>,
} }
#[derive(Error, Debug)]
pub enum WatcherError {
#[error(
"Too many file system entries within to watch within: {mnt} ({count} must be < {})",
MAX_ENTRIES_PER_STORAGE
)]
MountTooManyFiles { count: usize, mnt: String },
#[error(
"Mount too large to watch: {mnt} ({size} must be < {})",
MAX_SIZE_PER_WATCHABLE_MOUNT
)]
MountTooLarge { size: u64, mnt: String },
}
impl Drop for Storage { impl Drop for Storage {
fn drop(&mut self) { fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.target_mount_point); let _ = std::fs::remove_dir_all(&self.target_mount_point);
@ -65,7 +80,6 @@ impl Storage {
watch: true, watch: true,
watched_files: HashMap::new(), watched_files: HashMap::new(),
}; };
Ok(entry) Ok(entry)
} }
@ -143,7 +157,9 @@ impl Storage {
// Update identified files: // Update identified files:
for path in &updated_files { for path in &updated_files {
self.update_target(logger, path.as_path()).await?; if let Err(e) = self.update_target(logger, path.as_path()).await {
error!(logger, "failure in update_target: {:?}", e);
}
} }
Ok(updated_files.len()) Ok(updated_files.len())
@ -172,8 +188,10 @@ impl Storage {
ensure!( ensure!(
self.watched_files.len() <= MAX_ENTRIES_PER_STORAGE, self.watched_files.len() <= MAX_ENTRIES_PER_STORAGE,
"Too many file system entries to watch (must be < {})", WatcherError::MountTooManyFiles {
MAX_ENTRIES_PER_STORAGE count: self.watched_files.len(),
mnt: self.source_mount_point.display().to_string()
}
); );
// Insert will return old entry if any // Insert will return old entry if any
@ -201,10 +219,13 @@ impl Storage {
size += res_size; size += res_size;
} }
} }
ensure!( ensure!(
size <= MAX_SIZE_PER_WATCHABLE_MOUNT, size <= MAX_SIZE_PER_WATCHABLE_MOUNT,
"Too many file system entries to watch (must be < {})", WatcherError::MountTooLarge {
MAX_SIZE_PER_WATCHABLE_MOUNT, size,
mnt: self.source_mount_point.display().to_string()
}
); );
Ok(size) Ok(size)
@ -255,38 +276,59 @@ impl SandboxStorages {
async fn check(&mut self, logger: &Logger) -> Result<()> { async fn check(&mut self, logger: &Logger) -> Result<()> {
for entry in self.0.iter_mut().filter(|e| e.watch) { for entry in self.0.iter_mut().filter(|e| e.watch) {
if let Err(e) = entry.scan(logger).await { if let Err(e) = entry.scan(logger).await {
// If an error was observed, we will stop treating this Storage as being watchable, and match e.downcast_ref::<WatcherError>() {
// instead clean up the target-mount files on the tmpfs and bind mount the source_mount_point Some(WatcherError::MountTooLarge { .. })
// to target_mount_point. | Some(WatcherError::MountTooManyFiles { .. }) => {
error!(logger, "error observed when watching: {:?}", e); //
entry.watch = false; // If the mount we were watching is too large (bytes), or contains too many unique files,
// we no longer want to watch. Instead, we'll attempt to create a bind mount and mark this storage
// as non-watchable. if there's an error in creating bind mount, we'll continue watching.
//
// Ensure the target mount point exists:
if !entry.target_mount_point.as_path().exists() {
if entry.source_mount_point.as_path().is_dir() {
fs::create_dir_all(entry.target_mount_point.as_path())
.await
.with_context(|| {
format!(
"create dir for bindmount {:?}",
entry.target_mount_point.as_path()
)
})?;
} else {
fs::File::create(entry.target_mount_point.as_path())
.await
.with_context(|| {
format!(
"create file {:?}",
entry.target_mount_point.as_path()
)
})?;
}
}
// Remove destination contents, but not the directory itself, since this is match BareMount::new(
// assumed to be bind-mounted into a container. If source/mount is a file, no need to cleanup entry.source_mount_point.to_str().unwrap(),
if entry.target_mount_point.as_path().is_dir() { entry.target_mount_point.to_str().unwrap(),
for dir_entry in std::fs::read_dir(entry.target_mount_point.as_path())? { "bind",
let dir_entry = dir_entry?; MsFlags::MS_BIND,
let path = dir_entry.path(); "bind",
if dir_entry.file_type()?.is_dir() { logger,
tokio::fs::remove_dir_all(path).await?; )
} else { .mount()
tokio::fs::remove_file(path).await?; {
Ok(_) => {
entry.watch = false;
info!(logger, "watchable mount replaced with bind mount")
}
Err(e) => error!(logger, "unable to replace watchable: {:?}", e),
} }
} }
_ => warn!(logger, "scan error: {:?}", e),
} }
// - Create bind mount from source to destination
BareMount::new(
entry.source_mount_point.to_str().unwrap(),
entry.target_mount_point.to_str().unwrap(),
"bind",
MsFlags::MS_BIND,
"bind",
logger,
)
.mount()?;
} }
} }
Ok(()) Ok(())
} }
} }
@ -368,7 +410,7 @@ impl BindWatcher {
for (_, entries) in sandbox_storages.lock().await.iter_mut() { for (_, entries) in sandbox_storages.lock().await.iter_mut() {
if let Err(err) = entries.check(&logger).await { if let Err(err) = entries.check(&logger).await {
// We don't fail background loop, but rather log error instead. // We don't fail background loop, but rather log error instead.
error!(logger, "Check failed: {}", err); warn!(logger, "Check failed: {}", err);
} }
} }
} }
@ -409,57 +451,74 @@ mod tests {
use std::fs; use std::fs;
use std::thread; use std::thread;
async fn create_test_storage(dir: &Path, id: &str) -> Result<(protos::Storage, PathBuf)> {
let src_path = dir.join(format!("src{}", id));
let src_filename = src_path.to_str().expect("failed to create src filename");
let dest_path = dir.join(format!("dest{}", id));
let dest_filename = dest_path.to_str().expect("failed to create dest filename");
std::fs::create_dir_all(src_filename).expect("failed to create path");
let storage = protos::Storage {
source: src_filename.to_string(),
mount_point: dest_filename.to_string(),
..Default::default()
};
Ok((storage, src_path))
}
#[tokio::test] #[tokio::test]
async fn watch_entries() { async fn test_watch_entries() {
skip_if_not_root!(); skip_if_not_root!();
// If there's an error with an entry, let's make sure it is removed, and that the // If there's an error with an entry, let's make sure it is removed, and that the
// mount-destination behaves like a standard bind-mount. // mount-destination behaves like a standard bind-mount.
// Create an entries vector with three storage objects: storage, storage1, storage2. // Create an entries vector with four storage objects: storage0,1,2,3.
// We'll first verify each are evaluated correctly, then increase the first entry's contents // 0th we'll have fail due to too many files before running a check
// so it fails mount size check (>1MB) (test handling for failure on mount that is a directory). // 1st will just have a single medium sized file, we'll keep it watchable throughout
// We'll then similarly cause failure with storage2 (test handling for failure on mount that is // 2nd will have a large file (<1MB), but we'll later make larger to make unwatchable
// a single file). We'll then verify that storage1 continues to be watchable. // 3rd will have several files, and later we'll make unwatchable by having too many files.
let source_dir = tempfile::tempdir().unwrap(); // We'll run check a couple of times to verify watchable is always watchable, and unwatchable bind mounts
let dest_dir = tempfile::tempdir().unwrap(); // match our expectations.
let dir = tempfile::tempdir().expect("failed to create tempdir");
let storage = protos::Storage { let (storage0, src0_path) = create_test_storage(dir.path(), "1")
source: source_dir.path().display().to_string(), .await
mount_point: dest_dir.path().display().to_string(), .expect("failed to create storage");
..Default::default() let (storage1, src1_path) = create_test_storage(dir.path(), "2")
}; .await
std::fs::File::create(source_dir.path().join("small.txt")) .expect("failed to create storage");
let (storage2, src2_path) = create_test_storage(dir.path(), "3")
.await
.expect("failed to create storage");
let (storage3, src3_path) = create_test_storage(dir.path(), "4")
.await
.expect("failed to create storage");
// setup storage0: too many files
for i in 1..21 {
fs::write(src0_path.join(format!("{}.txt", i)), "original").unwrap();
}
// setup storage1: two small files
std::fs::File::create(src1_path.join("small.txt"))
.unwrap() .unwrap()
.set_len(10) .set_len(10)
.unwrap(); .unwrap();
fs::write(src1_path.join("foo.txt"), "original").unwrap();
let source_dir1 = tempfile::tempdir().unwrap(); // setup storage2: large file, but still watchable
let dest_dir1 = tempfile::tempdir().unwrap(); std::fs::File::create(src2_path.join("large.txt"))
let storage1 = protos::Storage {
source: source_dir1.path().display().to_string(),
mount_point: dest_dir1.path().display().to_string(),
..Default::default()
};
std::fs::File::create(source_dir1.path().join("large.txt"))
.unwrap() .unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT) .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
.unwrap(); .unwrap();
// And finally, create a single file mount: // setup storage3: many files, but still watchable
let source_dir2 = tempfile::tempdir().unwrap(); for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
let dest_dir2 = tempfile::tempdir().unwrap(); fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap();
}
let source_path = source_dir2.path().join("mounted-file");
let dest_path = dest_dir2.path().join("mounted-file");
let mounted_file = std::fs::File::create(&source_path).unwrap();
mounted_file.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT).unwrap();
let storage2 = protos::Storage {
source: source_path.display().to_string(),
mount_point: dest_path.display().to_string(),
..Default::default()
};
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
@ -468,69 +527,146 @@ mod tests {
}; };
entries entries
.add(std::iter::once(storage), &logger) .add(std::iter::once(storage0), &logger)
.await .await
.unwrap(); .unwrap();
entries entries
.add(std::iter::once(storage1), &logger) .add(std::iter::once(storage1), &logger)
.await .await
.unwrap(); .unwrap();
entries entries
.add(std::iter::once(storage2), &logger) .add(std::iter::once(storage2), &logger)
.await .await
.unwrap(); .unwrap();
entries
// Check that there are three entries, and that the .add(std::iter::once(storage3), &logger)
// destination (mount point) matches what we expect for .await
// the first:
assert!(entries.check(&logger).await.is_ok());
assert_eq!(entries.0.len(), 3);
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 1);
// Add a second file which will trip file size check:
std::fs::File::create(source_dir.path().join("big.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
.unwrap(); .unwrap();
assert!(entries.check(&logger).await.is_ok()); assert!(entries.check(&logger).await.is_ok());
// Check that there are four entries
assert_eq!(entries.0.len(), 4);
// Verify Storage 0 is no longer going to be watched: //verify that storage 0 is no longer going to be watched, but 1,2,3 are
assert!(!entries.0[0].watch); assert!(!entries.0[0].watch);
assert!(entries.0[1].watch);
assert!(entries.0[2].watch);
assert!(entries.0[3].watch);
// Verify that the directory has two entries: assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 8);
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 2);
// Verify that the directory is a bind mount. Add an entry without calling check, //verify target mount points contain expected number of entries:
// and verify that the destination directory includes these files in the case of assert_eq!(
// mount that is no longer being watched (storage), but not within the still-being std::fs::read_dir(entries.0[0].target_mount_point.as_path())
// watched (storage1): .unwrap()
fs::write(source_dir.path().join("1.txt"), "updated").unwrap(); .count(),
fs::write(source_dir1.path().join("2.txt"), "updated").unwrap(); 20
);
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert_eq!(
std::fs::read_dir(entries.0[2].target_mount_point.as_path())
.unwrap()
.count(),
1
);
assert_eq!(
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
.unwrap()
.count(),
MAX_ENTRIES_PER_STORAGE
);
assert_eq!(std::fs::read_dir(source_dir.path()).unwrap().count(), 3); // Add two files to storage 0, verify it is updated without needing to run check:
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 3); fs::write(src0_path.join("1.txt"), "updated").unwrap();
assert_eq!(std::fs::read_dir(source_dir1.path()).unwrap().count(), 2); fs::write(src0_path.join("foo.txt"), "new").unwrap();
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 1); fs::write(src0_path.join("bar.txt"), "new").unwrap();
assert_eq!(
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
.unwrap()
.count(),
22
);
assert_eq!(
fs::read_to_string(&entries.0[0].target_mount_point.as_path().join("1.txt")).unwrap(),
"updated"
);
// Verify that storage1 is still working. After running check, we expect that the number //
// of entries to increment // Prepare for second check: update mount sources
assert!(entries.check(&logger).await.is_ok()); //
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 2);
// Break storage2 by increasing the file size // source 3 will become unwatchable
mounted_file fs::write(src3_path.join("foo.txt"), "updated").unwrap();
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 10)
// source 2 will become unwatchable:
std::fs::File::create(src2_path.join("small.txt"))
.unwrap()
.set_len(10)
.unwrap(); .unwrap();
assert!(entries.check(&logger).await.is_ok());
// Verify Storage 2 is no longer going to be watched:
assert!(!entries.0[2].watch);
// Verify bind mount is working -- let's write to the file and observe output: // source 1: expect just an update
fs::write(&source_path, "updated").unwrap(); fs::write(src1_path.join("foo.txt"), "updated").unwrap();
assert_eq!(fs::read_to_string(&source_path).unwrap(), "updated");
assert!(entries.check(&logger).await.is_ok());
// verify that only storage 1 is still watchable
assert!(!entries.0[0].watch);
assert!(entries.0[1].watch);
assert!(!entries.0[2].watch);
assert!(!entries.0[3].watch);
// Verify storage 1 was updated, and storage 2,3 are up to date despite no watch
assert_eq!(
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
.unwrap()
.count(),
22
);
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert_eq!(
fs::read_to_string(&entries.0[1].target_mount_point.as_path().join("foo.txt")).unwrap(),
"updated"
);
assert_eq!(
std::fs::read_dir(entries.0[2].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert_eq!(
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
.unwrap()
.count(),
MAX_ENTRIES_PER_STORAGE + 1
);
// verify that we can remove files as well, but that it isn't observed until check is run
// for a watchable mount:
fs::remove_file(src1_path.join("foo.txt")).unwrap();
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert!(entries.check(&logger).await.is_ok());
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
1
);
} }
#[tokio::test] #[tokio::test]
@ -553,7 +689,15 @@ mod tests {
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 1) .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 1)
.unwrap(); .unwrap();
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
// Expect to receive a MountTooLarge error
match entry.scan(&logger).await {
Ok(_) => panic!("expected error"),
Err(e) => match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooLarge { .. }) => {}
_ => panic!("unexpected error"),
},
}
fs::remove_file(source_dir.path().join("big.txt")).unwrap(); fs::remove_file(source_dir.path().join("big.txt")).unwrap();
std::fs::File::create(source_dir.path().join("big.txt")) std::fs::File::create(source_dir.path().join("big.txt"))
@ -561,6 +705,7 @@ mod tests {
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT - 1) .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT - 1)
.unwrap(); .unwrap();
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_ok()); assert!(entry.scan(&logger).await.is_ok());
std::fs::File::create(source_dir.path().join("too-big.txt")) std::fs::File::create(source_dir.path().join("too-big.txt"))
@ -568,26 +713,38 @@ mod tests {
.set_len(2) .set_len(2)
.unwrap(); .unwrap();
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
// Expect to receive a MountTooLarge error
match entry.scan(&logger).await {
Ok(_) => panic!("expected error"),
Err(e) => match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooLarge { .. }) => {}
_ => panic!("unexpected error"),
},
}
fs::remove_file(source_dir.path().join("big.txt")).unwrap(); fs::remove_file(source_dir.path().join("big.txt")).unwrap();
fs::remove_file(source_dir.path().join("too-big.txt")).unwrap(); fs::remove_file(source_dir.path().join("too-big.txt")).unwrap();
// Up to eight files should be okay: // Up to 16 files should be okay:
fs::write(source_dir.path().join("1.txt"), "updated").unwrap(); for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
fs::write(source_dir.path().join("2.txt"), "updated").unwrap(); fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap();
fs::write(source_dir.path().join("3.txt"), "updated").unwrap(); }
fs::write(source_dir.path().join("4.txt"), "updated").unwrap();
fs::write(source_dir.path().join("5.txt"), "updated").unwrap();
fs::write(source_dir.path().join("6.txt"), "updated").unwrap();
fs::write(source_dir.path().join("7.txt"), "updated").unwrap();
fs::write(source_dir.path().join("8.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 8);
// Nine files is too many: assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE);
fs::write(source_dir.path().join("9.txt"), "updated").unwrap();
// 17 files is too many:
fs::write(source_dir.path().join("17.txt"), "updated").unwrap();
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
// Expect to receive a MountTooManyFiles error
match entry.scan(&logger).await {
Ok(_) => panic!("expected error"),
Err(e) => match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooManyFiles { .. }) => {}
_ => panic!("unexpected error"),
},
}
} }
#[tokio::test] #[tokio::test]