agent: watcher: fixes to make more robust

inotify/watchable-mount changes...

- Allow up to 16 files. It isn't that uncommon to have 3 files in a secret.
In Kubernetes, this results in 9 files in the mount (the presented files,
which are symlinks to the latest files, which are symlinks to actual files
which are in a seperate hidden directoy on the mount). Bumping from eight to 16 will
help ensure we can support "most" secret/tokens, and is still a pretty
small number to scan...

- Now we will only replace the watched storage with a bindmount if we observe
that there are too many files or if its too large. Since the scanning/updating is racy,
we should expect that we'll occassionally run into errors (ie, a file
deleted between scan / update). Rather than stopping and making a bind
mount, continue updating, as the changes will be updated the next time
check is called for that entry (every 2 seconds today).

To facilitate the 'oversized' handling, we create specific errors for too large
or too many files, and handle these specific errors when scanning the storage entry.

- When handling an oversided mount, do not remove the prior files -- we'll just
overwrite them with the bindmount. This'll help avoid the files
disappearing from the user, avoid racy cleanup and simplifies the flow.
Similarly, only mark it as a non-watched storage device after the
bindmount is created successfully.

- When creating bind mount, make sure destination exists. If we hadn't
had a successful scan before, this wouldn't exist and the mount would
fail. Update logic and unit test to cover this.

- In several spots, we were returning when there was an error (both in
scan and update). For update case, let's just log an warning and continue;
since the scan/update is racy, we should expect that we'll have
transient errors which should resolve the next time the watcher runs.

Fixes: #2402

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
This commit is contained in:
Eric Ernst 2021-08-03 15:11:03 -07:00
parent 2aa686a0f5
commit 961aaff004
3 changed files with 292 additions and 133 deletions

9
src/agent/Cargo.lock generated
View File

@ -549,6 +549,7 @@ dependencies = [
"slog-scope",
"slog-stdlog",
"tempfile",
"thiserror",
"tokio",
"tokio-vsock",
"tracing",
@ -1511,18 +1512,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.24"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e"
checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.24"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0"
checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745"
dependencies = [
"proc-macro2 1.0.26",
"quote 1.0.9",

View File

@ -18,6 +18,7 @@ capctl = "0.2.0"
serde_json = "1.0.39"
scan_fmt = "0.2.3"
scopeguard = "1.0.0"
thiserror = "1.0.26"
regex = "1"
# Async helpers

View File

@ -12,17 +12,17 @@ use tokio::fs;
use tokio::sync::Mutex;
use tokio::task;
use tokio::time::{self, Duration};
use thiserror::Error;
use anyhow::{ensure, Context, Result};
use async_recursion::async_recursion;
use nix::mount::{umount, MsFlags};
use slog::{debug, error, Logger};
use slog::{debug, error, info, warn, Logger};
use crate::mount::BareMount;
use crate::protocols::agent as protos;
/// The maximum number of file system entries agent will watch for each mount.
const MAX_ENTRIES_PER_STORAGE: usize = 8;
const MAX_ENTRIES_PER_STORAGE: usize = 16;
/// The maximum size of a watchable mount in bytes.
const MAX_SIZE_PER_WATCHABLE_MOUNT: u64 = 1024 * 1024;
@ -44,13 +44,28 @@ struct Storage {
target_mount_point: PathBuf,
/// Flag to indicate that the Storage should be watched. Storage will be watched until
/// the source becomes too large, either in number of files (>8) or total size (>1MB).
/// the source becomes too large, either in number of files (>16) or total size (>1MB).
watch: bool,
/// The list of files to watch from the source mount point and updated in the target one.
watched_files: HashMap<PathBuf, SystemTime>,
}
#[derive(Error, Debug)]
pub enum WatcherError {
#[error(
"Too many file system entries within to watch within: {mnt} ({count} must be < {})",
MAX_ENTRIES_PER_STORAGE
)]
MountTooManyFiles { count: usize, mnt: String },
#[error(
"Mount too large to watch: {mnt} ({size} must be < {})",
MAX_SIZE_PER_WATCHABLE_MOUNT
)]
MountTooLarge { size: u64, mnt: String },
}
impl Drop for Storage {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.target_mount_point);
@ -65,7 +80,6 @@ impl Storage {
watch: true,
watched_files: HashMap::new(),
};
Ok(entry)
}
@ -143,7 +157,9 @@ impl Storage {
// Update identified files:
for path in &updated_files {
self.update_target(logger, path.as_path()).await?;
if let Err(e) = self.update_target(logger, path.as_path()).await {
error!(logger, "failure in update_target: {:?}", e);
}
}
Ok(updated_files.len())
@ -172,8 +188,10 @@ impl Storage {
ensure!(
self.watched_files.len() <= MAX_ENTRIES_PER_STORAGE,
"Too many file system entries to watch (must be < {})",
MAX_ENTRIES_PER_STORAGE
WatcherError::MountTooManyFiles {
count: self.watched_files.len(),
mnt: self.source_mount_point.display().to_string()
}
);
// Insert will return old entry if any
@ -201,10 +219,13 @@ impl Storage {
size += res_size;
}
}
ensure!(
size <= MAX_SIZE_PER_WATCHABLE_MOUNT,
"Too many file system entries to watch (must be < {})",
MAX_SIZE_PER_WATCHABLE_MOUNT,
WatcherError::MountTooLarge {
size,
mnt: self.source_mount_point.display().to_string()
}
);
Ok(size)
@ -255,38 +276,59 @@ impl SandboxStorages {
async fn check(&mut self, logger: &Logger) -> Result<()> {
for entry in self.0.iter_mut().filter(|e| e.watch) {
if let Err(e) = entry.scan(logger).await {
// If an error was observed, we will stop treating this Storage as being watchable, and
// instead clean up the target-mount files on the tmpfs and bind mount the source_mount_point
// to target_mount_point.
error!(logger, "error observed when watching: {:?}", e);
entry.watch = false;
match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooLarge { .. })
| Some(WatcherError::MountTooManyFiles { .. }) => {
//
// If the mount we were watching is too large (bytes), or contains too many unique files,
// we no longer want to watch. Instead, we'll attempt to create a bind mount and mark this storage
// as non-watchable. if there's an error in creating bind mount, we'll continue watching.
//
// Ensure the target mount point exists:
if !entry.target_mount_point.as_path().exists() {
if entry.source_mount_point.as_path().is_dir() {
fs::create_dir_all(entry.target_mount_point.as_path())
.await
.with_context(|| {
format!(
"create dir for bindmount {:?}",
entry.target_mount_point.as_path()
)
})?;
} else {
fs::File::create(entry.target_mount_point.as_path())
.await
.with_context(|| {
format!(
"create file {:?}",
entry.target_mount_point.as_path()
)
})?;
}
}
// Remove destination contents, but not the directory itself, since this is
// assumed to be bind-mounted into a container. If source/mount is a file, no need to cleanup
if entry.target_mount_point.as_path().is_dir() {
for dir_entry in std::fs::read_dir(entry.target_mount_point.as_path())? {
let dir_entry = dir_entry?;
let path = dir_entry.path();
if dir_entry.file_type()?.is_dir() {
tokio::fs::remove_dir_all(path).await?;
} else {
tokio::fs::remove_file(path).await?;
match BareMount::new(
entry.source_mount_point.to_str().unwrap(),
entry.target_mount_point.to_str().unwrap(),
"bind",
MsFlags::MS_BIND,
"bind",
logger,
)
.mount()
{
Ok(_) => {
entry.watch = false;
info!(logger, "watchable mount replaced with bind mount")
}
Err(e) => error!(logger, "unable to replace watchable: {:?}", e),
}
}
_ => warn!(logger, "scan error: {:?}", e),
}
// - Create bind mount from source to destination
BareMount::new(
entry.source_mount_point.to_str().unwrap(),
entry.target_mount_point.to_str().unwrap(),
"bind",
MsFlags::MS_BIND,
"bind",
logger,
)
.mount()?;
}
}
Ok(())
}
}
@ -368,7 +410,7 @@ impl BindWatcher {
for (_, entries) in sandbox_storages.lock().await.iter_mut() {
if let Err(err) = entries.check(&logger).await {
// We don't fail background loop, but rather log error instead.
error!(logger, "Check failed: {}", err);
warn!(logger, "Check failed: {}", err);
}
}
}
@ -409,57 +451,74 @@ mod tests {
use std::fs;
use std::thread;
async fn create_test_storage(dir: &Path, id: &str) -> Result<(protos::Storage, PathBuf)> {
let src_path = dir.join(format!("src{}", id));
let src_filename = src_path.to_str().expect("failed to create src filename");
let dest_path = dir.join(format!("dest{}", id));
let dest_filename = dest_path.to_str().expect("failed to create dest filename");
std::fs::create_dir_all(src_filename).expect("failed to create path");
let storage = protos::Storage {
source: src_filename.to_string(),
mount_point: dest_filename.to_string(),
..Default::default()
};
Ok((storage, src_path))
}
#[tokio::test]
async fn watch_entries() {
async fn test_watch_entries() {
skip_if_not_root!();
// If there's an error with an entry, let's make sure it is removed, and that the
// mount-destination behaves like a standard bind-mount.
// Create an entries vector with three storage objects: storage, storage1, storage2.
// We'll first verify each are evaluated correctly, then increase the first entry's contents
// so it fails mount size check (>1MB) (test handling for failure on mount that is a directory).
// We'll then similarly cause failure with storage2 (test handling for failure on mount that is
// a single file). We'll then verify that storage1 continues to be watchable.
let source_dir = tempfile::tempdir().unwrap();
let dest_dir = tempfile::tempdir().unwrap();
// Create an entries vector with four storage objects: storage0,1,2,3.
// 0th we'll have fail due to too many files before running a check
// 1st will just have a single medium sized file, we'll keep it watchable throughout
// 2nd will have a large file (<1MB), but we'll later make larger to make unwatchable
// 3rd will have several files, and later we'll make unwatchable by having too many files.
// We'll run check a couple of times to verify watchable is always watchable, and unwatchable bind mounts
// match our expectations.
let dir = tempfile::tempdir().expect("failed to create tempdir");
let storage = protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
};
std::fs::File::create(source_dir.path().join("small.txt"))
let (storage0, src0_path) = create_test_storage(dir.path(), "1")
.await
.expect("failed to create storage");
let (storage1, src1_path) = create_test_storage(dir.path(), "2")
.await
.expect("failed to create storage");
let (storage2, src2_path) = create_test_storage(dir.path(), "3")
.await
.expect("failed to create storage");
let (storage3, src3_path) = create_test_storage(dir.path(), "4")
.await
.expect("failed to create storage");
// setup storage0: too many files
for i in 1..21 {
fs::write(src0_path.join(format!("{}.txt", i)), "original").unwrap();
}
// setup storage1: two small files
std::fs::File::create(src1_path.join("small.txt"))
.unwrap()
.set_len(10)
.unwrap();
fs::write(src1_path.join("foo.txt"), "original").unwrap();
let source_dir1 = tempfile::tempdir().unwrap();
let dest_dir1 = tempfile::tempdir().unwrap();
let storage1 = protos::Storage {
source: source_dir1.path().display().to_string(),
mount_point: dest_dir1.path().display().to_string(),
..Default::default()
};
std::fs::File::create(source_dir1.path().join("large.txt"))
// setup storage2: large file, but still watchable
std::fs::File::create(src2_path.join("large.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
.unwrap();
// And finally, create a single file mount:
let source_dir2 = tempfile::tempdir().unwrap();
let dest_dir2 = tempfile::tempdir().unwrap();
let source_path = source_dir2.path().join("mounted-file");
let dest_path = dest_dir2.path().join("mounted-file");
let mounted_file = std::fs::File::create(&source_path).unwrap();
mounted_file.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT).unwrap();
let storage2 = protos::Storage {
source: source_path.display().to_string(),
mount_point: dest_path.display().to_string(),
..Default::default()
};
// setup storage3: many files, but still watchable
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap();
}
let logger = slog::Logger::root(slog::Discard, o!());
@ -468,69 +527,146 @@ mod tests {
};
entries
.add(std::iter::once(storage), &logger)
.add(std::iter::once(storage0), &logger)
.await
.unwrap();
entries
.add(std::iter::once(storage1), &logger)
.await
.unwrap();
entries
.add(std::iter::once(storage2), &logger)
.await
.unwrap();
// Check that there are three entries, and that the
// destination (mount point) matches what we expect for
// the first:
assert!(entries.check(&logger).await.is_ok());
assert_eq!(entries.0.len(), 3);
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 1);
// Add a second file which will trip file size check:
std::fs::File::create(source_dir.path().join("big.txt"))
.unwrap()
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT)
entries
.add(std::iter::once(storage3), &logger)
.await
.unwrap();
assert!(entries.check(&logger).await.is_ok());
// Check that there are four entries
assert_eq!(entries.0.len(), 4);
// Verify Storage 0 is no longer going to be watched:
//verify that storage 0 is no longer going to be watched, but 1,2,3 are
assert!(!entries.0[0].watch);
assert!(entries.0[1].watch);
assert!(entries.0[2].watch);
assert!(entries.0[3].watch);
// Verify that the directory has two entries:
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 2);
assert_eq!(std::fs::read_dir(dir.path()).unwrap().count(), 8);
// Verify that the directory is a bind mount. Add an entry without calling check,
// and verify that the destination directory includes these files in the case of
// mount that is no longer being watched (storage), but not within the still-being
// watched (storage1):
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
fs::write(source_dir1.path().join("2.txt"), "updated").unwrap();
//verify target mount points contain expected number of entries:
assert_eq!(
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
.unwrap()
.count(),
20
);
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert_eq!(
std::fs::read_dir(entries.0[2].target_mount_point.as_path())
.unwrap()
.count(),
1
);
assert_eq!(
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
.unwrap()
.count(),
MAX_ENTRIES_PER_STORAGE
);
assert_eq!(std::fs::read_dir(source_dir.path()).unwrap().count(), 3);
assert_eq!(std::fs::read_dir(dest_dir.path()).unwrap().count(), 3);
assert_eq!(std::fs::read_dir(source_dir1.path()).unwrap().count(), 2);
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 1);
// Add two files to storage 0, verify it is updated without needing to run check:
fs::write(src0_path.join("1.txt"), "updated").unwrap();
fs::write(src0_path.join("foo.txt"), "new").unwrap();
fs::write(src0_path.join("bar.txt"), "new").unwrap();
assert_eq!(
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
.unwrap()
.count(),
22
);
assert_eq!(
fs::read_to_string(&entries.0[0].target_mount_point.as_path().join("1.txt")).unwrap(),
"updated"
);
// Verify that storage1 is still working. After running check, we expect that the number
// of entries to increment
assert!(entries.check(&logger).await.is_ok());
assert_eq!(std::fs::read_dir(dest_dir1.path()).unwrap().count(), 2);
//
// Prepare for second check: update mount sources
//
// Break storage2 by increasing the file size
mounted_file
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 10)
// source 3 will become unwatchable
fs::write(src3_path.join("foo.txt"), "updated").unwrap();
// source 2 will become unwatchable:
std::fs::File::create(src2_path.join("small.txt"))
.unwrap()
.set_len(10)
.unwrap();
assert!(entries.check(&logger).await.is_ok());
// Verify Storage 2 is no longer going to be watched:
assert!(!entries.0[2].watch);
// Verify bind mount is working -- let's write to the file and observe output:
fs::write(&source_path, "updated").unwrap();
assert_eq!(fs::read_to_string(&source_path).unwrap(), "updated");
// source 1: expect just an update
fs::write(src1_path.join("foo.txt"), "updated").unwrap();
assert!(entries.check(&logger).await.is_ok());
// verify that only storage 1 is still watchable
assert!(!entries.0[0].watch);
assert!(entries.0[1].watch);
assert!(!entries.0[2].watch);
assert!(!entries.0[3].watch);
// Verify storage 1 was updated, and storage 2,3 are up to date despite no watch
assert_eq!(
std::fs::read_dir(entries.0[0].target_mount_point.as_path())
.unwrap()
.count(),
22
);
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert_eq!(
fs::read_to_string(&entries.0[1].target_mount_point.as_path().join("foo.txt")).unwrap(),
"updated"
);
assert_eq!(
std::fs::read_dir(entries.0[2].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert_eq!(
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
.unwrap()
.count(),
MAX_ENTRIES_PER_STORAGE + 1
);
// verify that we can remove files as well, but that it isn't observed until check is run
// for a watchable mount:
fs::remove_file(src1_path.join("foo.txt")).unwrap();
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
2
);
assert!(entries.check(&logger).await.is_ok());
assert_eq!(
std::fs::read_dir(entries.0[1].target_mount_point.as_path())
.unwrap()
.count(),
1
);
}
#[tokio::test]
@ -553,7 +689,15 @@ mod tests {
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT + 1)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
// Expect to receive a MountTooLarge error
match entry.scan(&logger).await {
Ok(_) => panic!("expected error"),
Err(e) => match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooLarge { .. }) => {}
_ => panic!("unexpected error"),
},
}
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
std::fs::File::create(source_dir.path().join("big.txt"))
@ -561,6 +705,7 @@ mod tests {
.set_len(MAX_SIZE_PER_WATCHABLE_MOUNT - 1)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_ok());
std::fs::File::create(source_dir.path().join("too-big.txt"))
@ -568,26 +713,38 @@ mod tests {
.set_len(2)
.unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
// Expect to receive a MountTooLarge error
match entry.scan(&logger).await {
Ok(_) => panic!("expected error"),
Err(e) => match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooLarge { .. }) => {}
_ => panic!("unexpected error"),
},
}
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
fs::remove_file(source_dir.path().join("too-big.txt")).unwrap();
// Up to eight files should be okay:
fs::write(source_dir.path().join("1.txt"), "updated").unwrap();
fs::write(source_dir.path().join("2.txt"), "updated").unwrap();
fs::write(source_dir.path().join("3.txt"), "updated").unwrap();
fs::write(source_dir.path().join("4.txt"), "updated").unwrap();
fs::write(source_dir.path().join("5.txt"), "updated").unwrap();
fs::write(source_dir.path().join("6.txt"), "updated").unwrap();
fs::write(source_dir.path().join("7.txt"), "updated").unwrap();
fs::write(source_dir.path().join("8.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 8);
// Up to 16 files should be okay:
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap();
}
// Nine files is too many:
fs::write(source_dir.path().join("9.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE);
// 17 files is too many:
fs::write(source_dir.path().join("17.txt"), "updated").unwrap();
thread::sleep(Duration::from_secs(1));
assert!(entry.scan(&logger).await.is_err());
// Expect to receive a MountTooManyFiles error
match entry.scan(&logger).await {
Ok(_) => panic!("expected error"),
Err(e) => match e.downcast_ref::<WatcherError>() {
Some(WatcherError::MountTooManyFiles { .. }) => {}
_ => panic!("unexpected error"),
},
}
}
#[tokio::test]