diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index f222aff2d3..f922acba94 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -549,6 +549,7 @@ dependencies = [ "slog-scope", "slog-stdlog", "tempfile", + "thiserror", "tokio", "tokio-vsock", "tracing", @@ -1511,18 +1512,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.24" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" +checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.24" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" +checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745" dependencies = [ "proc-macro2 1.0.26", "quote 1.0.9", diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 72e1b49a27..10cf2082c5 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -18,6 +18,7 @@ capctl = "0.2.0" serde_json = "1.0.39" scan_fmt = "0.2.3" scopeguard = "1.0.0" +thiserror = "1.0.26" regex = "1" # Async helpers diff --git a/src/agent/src/watcher.rs b/src/agent/src/watcher.rs index 010733f4ee..d03b4f97d5 100644 --- a/src/agent/src/watcher.rs +++ b/src/agent/src/watcher.rs @@ -12,17 +12,17 @@ use tokio::fs; use tokio::sync::Mutex; use tokio::task; use tokio::time::{self, Duration}; - +use thiserror::Error; use anyhow::{ensure, Context, Result}; use async_recursion::async_recursion; use nix::mount::{umount, MsFlags}; -use slog::{debug, error, Logger}; +use slog::{debug, error, info, warn, Logger}; use crate::mount::BareMount; use crate::protocols::agent as protos; /// The maximum number of file system entries agent will watch for each mount. -const MAX_ENTRIES_PER_STORAGE: usize = 8; +const MAX_ENTRIES_PER_STORAGE: usize = 16; /// The maximum size of a watchable mount in bytes. const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024; @@ -44,13 +44,28 @@ struct Storage { target_mount_point: PathBuf, /// Flag to indicate that the Storage should be watched. Storage will be watched until - /// the source becomes too large, either in number of files (>8) or total size (>1MB). + /// the source becomes too large, either in number of files (>16) or total size (>1MB). watch: bool, /// The list of files to watch from the source mount point and updated in the target one. watched_files: HashMap, } +#[derive(Error, Debug)] +pub enum WatcherError { + #[error( + "Too many file system entries within to watch within: {mnt} ({count} must be < {})", + MAX_ENTRIES_PER_STORAGE + )] + MountTooManyFiles { count: usize, mnt: String }, + + #[error( + "Mount too large to watch: {mnt} ({size} must be < {})", + MAX_SIZE_PER_WATCHABLE_MOUNT + )] + MountTooLarge { size: u64, mnt: String }, +} + impl Drop for Storage { fn drop(&mut self) { let _ = std::fs::remove_dir_all(&self.target_mount_point); @@ -65,7 +80,6 @@ impl Storage { watch: true, watched_files: HashMap::new(), }; - Ok(entry) } @@ -143,7 +157,9 @@ impl Storage { // Update identified files: for path in &updated_files { - self.update_target(logger, path.as_path()).await?; + if let Err(e) = self.update_target(logger, path.as_path()).await { + error!(logger, "failure in update_target: {:?}", e); + } } Ok(updated_files.len()) @@ -172,8 +188,10 @@ impl Storage { ensure!( self.watched_files.len() <= MAX_ENTRIES_PER_STORAGE, - "Too many file system entries to watch (must be < {})", - MAX_ENTRIES_PER_STORAGE + WatcherError::MountTooManyFiles { + count: self.watched_files.len(), + mnt: self.source_mount_point.display().to_string() + } ); // Insert will return old entry if any @@ -201,10 +219,13 @@ impl Storage { size += res_size; } } + ensure!( size <= MAX_SIZE_PER_WATCHABLE_MOUNT, - "Too many file system entries to watch (must be < {})", - MAX_SIZE_PER_WATCHABLE_MOUNT, + WatcherError::MountTooLarge { + size, + mnt: self.source_mount_point.display().to_string() + } ); Ok(size) @@ -255,38 +276,59 @@ impl SandboxStorages { async fn check(&mut self, logger: &Logger) -> Result<()> { for entry in self.0.iter_mut().filter(|e| e.watch) { if let Err(e) = entry.scan(logger).await { - // If an error was observed, we will stop treating this Storage as being watchable, and - // instead clean up the target-mount files on the tmpfs and bind mount the source_mount_point - // to target_mount_point. - error!(logger, "error observed when watching: {:?}", e); - entry.watch = false; + match e.downcast_ref::() { + Some(WatcherError::MountTooLarge { .. }) + | Some(WatcherError::MountTooManyFiles { .. }) => { + // + // If the mount we were watching is too large (bytes), or contains too many unique files, + // we no longer want to watch. Instead, we'll attempt to create a bind mount and mark this storage + // as non-watchable. if there's an error in creating bind mount, we'll continue watching. + // + // Ensure the target mount point exists: + if !entry.target_mount_point.as_path().exists() { + if entry.source_mount_point.as_path().is_dir() { + fs::create_dir_all(entry.target_mount_point.as_path()) + .await + .with_context(|| { + format!( + "create dir for bindmount {:?}", + entry.target_mount_point.as_path() + ) + })?; + } else { + fs::File::create(entry.target_mount_point.as_path()) + .await + .with_context(|| { + format!( + "create file {:?}", + entry.target_mount_point.as_path() + ) + })?; + } + } - // Remove destination contents, but not the directory itself, since this is - // assumed to be bind-mounted into a container. If source/mount is a file, no need to cleanup - if entry.target_mount_point.as_path().is_dir() { - for dir_entry in std::fs::read_dir(entry.target_mount_point.as_path())? { - let dir_entry = dir_entry?; - let path = dir_entry.path(); - if dir_entry.file_type()?.is_dir() { - tokio::fs::remove_dir_all(path).await?; - } else { - tokio::fs::remove_file(path).await?; + match BareMount::new( + entry.source_mount_point.to_str().unwrap(), + entry.target_mount_point.to_str().unwrap(), + "bind", + MsFlags::MS_BIND, + "bind", + logger, + ) + .mount() + { + Ok(_) => { + entry.watch = false; + info!(logger, "watchable mount replaced with bind mount") + } + Err(e) => error!(logger, "unable to replace watchable: {:?}", e), } } + _ => warn!(logger, "scan error: {:?}", e), } - - // - Create bind mount from source to destination - BareMount::new( - entry.source_mount_point.to_str().unwrap(), - entry.target_mount_point.to_str().unwrap(), - "bind", - MsFlags::MS_BIND, - "bind", - logger, - ) - .mount()?; } } + Ok(()) } } @@ -368,7 +410,7 @@ impl BindWatcher { for (_, entries) in sandbox_storages.lock().await.iter_mut() { if let Err(err) = entries.check(&logger).await { // We don't fail background loop, but rather log error instead. - error!(logger, "Check failed: {}", err); + warn!(logger, "Check failed: {}", err); } } } @@ -409,57 +451,74 @@ mod tests { use std::fs; use std::thread; + async fn create_test_storage(dir: &Path, id: &str) -> Result<(protos::Storage, PathBuf)> { + let src_path = dir.join(format!("src{}", id)); + let src_filename = src_path.to_str().expect("failed to create src filename"); + let dest_path = dir.join(format!("dest{}", id)); + let dest_filename = dest_path.to_str().expect("failed to create dest filename"); + + std::fs::create_dir_all(src_filename).expect("failed to create path"); + + let storage = protos::Storage { + source: src_filename.to_string(), + mount_point: dest_filename.to_string(), + ..Default::default() + }; + + Ok((storage, src_path)) + } + #[tokio::test] - async fn watch_entries() { + async fn test_watch_entries() { skip_if_not_root!(); // If there's an error with an entry, let's make sure it is removed, and that the // mount-destination behaves like a standard bind-mount. - // Create an entries vector with three storage objects: storage, storage1, storage2. - // We'll first verify each are evaluated correctly, then increase the first entry's contents - // so it fails mount size check (>1MB) (test handling for failure on mount that is a directory). - // We'll then similarly cause failure with storage2 (test handling for failure on mount that is - // a single file). We'll then verify that storage1 continues to be watchable. - let source_dir = tempfile::tempdir().unwrap(); - let dest_dir = tempfile::tempdir().unwrap(); + // Create an entries vector with four storage objects: storage0,1,2,3. + // 0th we'll have fail due to too many files before running a check + // 1st will just have a single medium sized file, we'll keep it watchable throughout + // 2nd will have a large file (<1MB), but we'll later make larger to make unwatchable + // 3rd will have several files, and later we'll make unwatchable by having too many files. + // We'll run check a couple of times to verify watchable is always watchable, and unwatchable bind mounts + // match our expectations. + let dir = tempfile::tempdir().expect("failed to create tempdir"); - let storage = protos::Storage { - source: source_dir.path().display().to_string(), - mount_point: dest_dir.path().display().to_string(), - ..Default::default() - }; - std::fs::File::create(source_dir.path().join("small.txt")) + let (storage0, src0_path) = create_test_storage(dir.path(), "1") + .await + .expect("failed to create storage"); + let (storage1, src1_path) = create_test_storage(dir.path(), "2") + .await + .expect("failed to create storage"); + let (storage2, src2_path) = create_test_storage(dir.path(), "3") + .await + .expect("failed to create storage"); + let (storage3, src3_path) = create_test_storage(dir.path(), "4") + .await + .expect("failed to create storage"); + + // setup storage0: too many files + for i in 1..21 { + fs::write(src0_path.join(format!("{}.txt", i)), "original").unwrap(); + } + + // setup storage1: two small files + std::fs::File::create(src1_path.join("small.txt")) .unwrap() .set_len(10) .unwrap(); + fs::write(src1_path.join("foo.txt"), "original").unwrap(); - let source_dir1 = tempfile::tempdir().unwrap(); - let dest_dir1 = tempfile::tempdir().unwrap(); - let storage1 = protos::Storage { - source: source_dir1.path().display().to_string(), - mount_point: dest_dir1.path().display().to_string(), - ..Default::default() - }; - std::fs::File::create(source_dir1.path().join("large.txt")) + // setup storage2: large file, but still watchable + std::fs::File::create(src2_path.join("large.txt")) .unwrap() .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT) .unwrap(); - // And finally, create a single file mount: - let source_dir2 = tempfile::tempdir().unwrap(); - let dest_dir2 = tempfile::tempdir().unwrap(); - - let source_path = source_dir2.path().join("mounted-file"); - let dest_path = dest_dir2.path().join("mounted-file"); - let mounted_file = std::fs::File::create(&source_path).unwrap(); - mounted_file.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT).unwrap(); - - let storage2 = protos::Storage { - source: source_path.display().to_string(), - mount_point: dest_path.display().to_string(), - ..Default::default() - }; + // setup storage3: many files, but still watchable + for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap(); + } let logger = slog::Logger::root(slog::Discard, o!()); @@ -468,69 +527,146 @@ mod tests { }; entries - .add(std::iter::once(storage), &logger) + .add(std::iter::once(storage0), &logger) .await .unwrap(); - entries .add(std::iter::once(storage1), &logger) .await .unwrap(); - entries .add(std::iter::once(storage2), &logger) .await .unwrap(); - - // Check that there are three entries, and that the - // destination (mount point) matches what we expect for - // the first: - assert!(entries.check(&logger).await.is_ok()); - assert_eq!(entries.0.len(), 3); - assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 1); - - // Add a second file which will trip file size check: - std::fs::File::create(source_dir.path().join("big.txt")) - .unwrap() - .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT) + entries + .add(std::iter::once(storage3), &logger) + .await .unwrap(); assert!(entries.check(&logger).await.is_ok()); + // Check that there are four entries + assert_eq!(entries.0.len(), 4); - // Verify Storage 0 is no longer going to be watched: + //verify that storage 0 is no longer going to be watched, but 1,2,3 are assert!(!entries.0[0].watch); + assert!(entries.0[1].watch); + assert!(entries.0[2].watch); + assert!(entries.0[3].watch); - // Verify that the directory has two entries: - assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 2); + assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 8); - // Verify that the directory is a bind mount. Add an entry without calling check, - // and verify that the destination directory includes these files in the case of - // mount that is no longer being watched (storage), but not within the still-being - // watched (storage1): - fs::write(source_dir.path().join("1.txt"), "updated").unwrap(); - fs::write(source_dir1.path().join("2.txt"), "updated").unwrap(); + //verify target mount points contain expected number of entries: + assert_eq!( + std::fs::read_dir(entries.0[0].target_mount_point.as_path()) + .unwrap() + .count(), + 20 + ); + assert_eq!( + std::fs::read_dir(entries.0[1].target_mount_point.as_path()) + .unwrap() + .count(), + 2 + ); + assert_eq!( + std::fs::read_dir(entries.0[2].target_mount_point.as_path()) + .unwrap() + .count(), + 1 + ); + assert_eq!( + std::fs::read_dir(entries.0[3].target_mount_point.as_path()) + .unwrap() + .count(), + MAX_ENTRIES_PER_STORAGE + ); - assert_eq!(std::fs::read_dir(source_dir.path()).unwrap().count(), 3); - assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 3); - assert_eq!(std::fs::read_dir(source_dir1.path()).unwrap().count(), 2); - assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 1); + // Add two files to storage 0, verify it is updated without needing to run check: + fs::write(src0_path.join("1.txt"), "updated").unwrap(); + fs::write(src0_path.join("foo.txt"), "new").unwrap(); + fs::write(src0_path.join("bar.txt"), "new").unwrap(); + assert_eq!( + std::fs::read_dir(entries.0[0].target_mount_point.as_path()) + .unwrap() + .count(), + 22 + ); + assert_eq!( + fs::read_to_string(&entries.0[0].target_mount_point.as_path().join("1.txt")).unwrap(), + "updated" + ); - // Verify that storage1 is still working. After running check, we expect that the number - // of entries to increment - assert!(entries.check(&logger).await.is_ok()); - assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 2); + // + // Prepare for second check: update mount sources + // - // Break storage2 by increasing the file size - mounted_file - .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 10) + // source 3 will become unwatchable + fs::write(src3_path.join("foo.txt"), "updated").unwrap(); + + // source 2 will become unwatchable: + std::fs::File::create(src2_path.join("small.txt")) + .unwrap() + .set_len(10) .unwrap(); - assert!(entries.check(&logger).await.is_ok()); - // Verify Storage 2 is no longer going to be watched: - assert!(!entries.0[2].watch); - // Verify bind mount is working -- let's write to the file and observe output: - fs::write(&source_path, "updated").unwrap(); - assert_eq!(fs::read_to_string(&source_path).unwrap(), "updated"); + // source 1: expect just an update + fs::write(src1_path.join("foo.txt"), "updated").unwrap(); + + assert!(entries.check(&logger).await.is_ok()); + + // verify that only storage 1 is still watchable + assert!(!entries.0[0].watch); + assert!(entries.0[1].watch); + assert!(!entries.0[2].watch); + assert!(!entries.0[3].watch); + + // Verify storage 1 was updated, and storage 2,3 are up to date despite no watch + assert_eq!( + std::fs::read_dir(entries.0[0].target_mount_point.as_path()) + .unwrap() + .count(), + 22 + ); + assert_eq!( + std::fs::read_dir(entries.0[1].target_mount_point.as_path()) + .unwrap() + .count(), + 2 + ); + assert_eq!( + fs::read_to_string(&entries.0[1].target_mount_point.as_path().join("foo.txt")).unwrap(), + "updated" + ); + + assert_eq!( + std::fs::read_dir(entries.0[2].target_mount_point.as_path()) + .unwrap() + .count(), + 2 + ); + assert_eq!( + std::fs::read_dir(entries.0[3].target_mount_point.as_path()) + .unwrap() + .count(), + MAX_ENTRIES_PER_STORAGE + 1 + ); + + // verify that we can remove files as well, but that it isn't observed until check is run + // for a watchable mount: + fs::remove_file(src1_path.join("foo.txt")).unwrap(); + assert_eq!( + std::fs::read_dir(entries.0[1].target_mount_point.as_path()) + .unwrap() + .count(), + 2 + ); + assert!(entries.check(&logger).await.is_ok()); + assert_eq!( + std::fs::read_dir(entries.0[1].target_mount_point.as_path()) + .unwrap() + .count(), + 1 + ); } #[tokio::test] @@ -553,7 +689,15 @@ mod tests { .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 1) .unwrap(); thread::sleep(Duration::from_secs(1)); - assert!(entry.scan(&logger).await.is_err()); + + // Expect to receive a MountTooLarge error + match entry.scan(&logger).await { + Ok(_) => panic!("expected error"), + Err(e) => match e.downcast_ref::() { + Some(WatcherError::MountTooLarge { .. }) => {} + _ => panic!("unexpected error"), + }, + } fs::remove_file(source_dir.path().join("big.txt")).unwrap(); std::fs::File::create(source_dir.path().join("big.txt")) @@ -561,6 +705,7 @@ mod tests { .set_len(MAX_SIZE_PER_WATCHABLE_MOUNT - 1) .unwrap(); thread::sleep(Duration::from_secs(1)); + assert!(entry.scan(&logger).await.is_ok()); std::fs::File::create(source_dir.path().join("too-big.txt")) @@ -568,26 +713,38 @@ mod tests { .set_len(2) .unwrap(); thread::sleep(Duration::from_secs(1)); - assert!(entry.scan(&logger).await.is_err()); + + // Expect to receive a MountTooLarge error + match entry.scan(&logger).await { + Ok(_) => panic!("expected error"), + Err(e) => match e.downcast_ref::() { + Some(WatcherError::MountTooLarge { .. }) => {} + _ => panic!("unexpected error"), + }, + } fs::remove_file(source_dir.path().join("big.txt")).unwrap(); fs::remove_file(source_dir.path().join("too-big.txt")).unwrap(); - // Up to eight files should be okay: - fs::write(source_dir.path().join("1.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("2.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("3.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("4.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("5.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("6.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("7.txt"), "updated").unwrap(); - fs::write(source_dir.path().join("8.txt"), "updated").unwrap(); - assert_eq!(entry.scan(&logger).await.unwrap(), 8); + // Up to 16 files should be okay: + for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap(); + } - // Nine files is too many: - fs::write(source_dir.path().join("9.txt"), "updated").unwrap(); + assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE); + + // 17 files is too many: + fs::write(source_dir.path().join("17.txt"), "updated").unwrap(); thread::sleep(Duration::from_secs(1)); - assert!(entry.scan(&logger).await.is_err()); + + // Expect to receive a MountTooManyFiles error + match entry.scan(&logger).await { + Ok(_) => panic!("expected error"), + Err(e) => match e.downcast_ref::() { + Some(WatcherError::MountTooManyFiles { .. }) => {} + _ => panic!("unexpected error"), + }, + } } #[tokio::test]