watchers: handle symlinked directories, dir removal

- Even a directory could be a symlink - check for this. This is very
common when using configmaps/secrets
- Add unit test to better mimic a configmap, configmap update
- We would never remove directories before. Let's ensure that these are
added to the watched_list, and verify in unit tests
- Update unit tests which exercise maximum number of files per entry. There's a change
in behavior now that we consider directories/symlinks watchable as well.
For these tests, it means we support one less file in a watchable mount.

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
This commit is contained in:
Eric Ernst 2021-11-15 14:46:09 -08:00
parent 2b6dfe414a
commit 296e76f8ee

View File

@ -49,7 +49,7 @@ struct Storage {
/// the source becomes too large, either in number of files (>16) or total size (>1MB).
watch: bool,
/// The list of files to watch from the source mount point and updated in the target one.
/// The list of files, directories, symlinks to watch from the source mount point and updated in the target one.
watched_files: HashMap<PathBuf, SystemTime>,
}
@ -80,9 +80,13 @@ impl Drop for Storage {
}
async fn copy(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<()> {
// if source is a symlink, just create new symlink with same link source
if fs::symlink_metadata(&from).await?.file_type().is_symlink() {
fs::symlink(fs::read_link(&from).await?, to).await?;
// if source is a symlink, create new symlink with same link source. If
// the symlink exists, remove and create new one:
if fs::symlink_metadata(&to).await.is_ok() {
fs::remove_file(&to).await?;
}
fs::symlink(fs::read_link(&from).await?, &to).await?;
} else {
fs::copy(from, to).await?;
}
@ -103,6 +107,16 @@ impl Storage {
async fn update_target(&self, logger: &Logger, source_path: impl AsRef<Path>) -> Result<()> {
let source_file_path = source_path.as_ref();
// if we are creating a directory: just create it, nothing more to do
if source_file_path.symlink_metadata()?.file_type().is_dir() {
fs::create_dir_all(source_file_path)
.await
.with_context(|| {
format!("Unable to mkdir all for {}", source_file_path.display())
})?
}
// Assume we are dealing with either a file or a symlink now:
let dest_file_path = if self.source_mount_point.is_file() {
// Simple file to file copy
// Assume target mount is a file path
@ -139,7 +153,7 @@ impl Storage {
let mut remove_list = Vec::new();
let mut updated_files: Vec<PathBuf> = Vec::new();
// Remove deleted files for tracking list
// Remove deleted files for tracking list.
self.watched_files.retain(|st, _| {
if st.exists() {
true
@ -151,10 +165,19 @@ impl Storage {
// Delete from target
for path in remove_list {
// File has been deleted, remove it from target mount
let target = self.make_target_path(path)?;
debug!(logger, "Removing file from mount: {}", target.display());
let _ = fs::remove_file(target).await;
// The target may be a directory or a file. If it is a directory that is removed,
// we'll remove all files under that directory as well. Because of this, there's a
// chance the target (a subdirectory or file under a prior removed target) was already
// removed. Make sure we check if the target exists before checking the metadata, and
// don't return an error if the remove fails
if target.exists() && target.symlink_metadata()?.file_type().is_dir() {
debug!(logger, "Removing a directory: {}", target.display());
let _ = fs::remove_dir_all(target).await;
} else {
debug!(logger, "Removing a file: {}", target.display());
let _ = fs::remove_file(target).await;
}
}
// Scan new & changed files
@ -186,15 +209,16 @@ impl Storage {
let mut size: u64 = 0;
debug!(logger, "Scanning path: {}", path.display());
if path.is_file() {
let metadata = path
.metadata()
.with_context(|| format!("Failed to query metadata for: {}", path.display()))?;
let metadata = path
.symlink_metadata()
.with_context(|| format!("Failed to query metadata for: {}", path.display()))?;
let modified = metadata
.modified()
.with_context(|| format!("Failed to get modified date for: {}", path.display()))?;
let modified = metadata
.modified()
.with_context(|| format!("Failed to get modified date for: {}", path.display()))?;
// Treat files and symlinks the same:
if path.is_file() || metadata.file_type().is_symlink() {
size += metadata.len();
// Insert will return old entry if any
@ -216,6 +240,16 @@ impl Storage {
}
);
} else {
// Handling regular directories - check to see if this directory is already being tracked, and
// track if not:
if self
.watched_files
.insert(path.to_path_buf(), modified)
.is_none()
{
update_list.push(path.to_path_buf());
}
// Scan dir recursively
let mut entries = fs::read_dir(path)
.await
@ -616,7 +650,7 @@ mod tests {
.unwrap();
// setup storage3: many files, but still watchable
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
for i in 1..MAX_ENTRIES_PER_STORAGE {
fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap();
}
@ -678,7 +712,7 @@ mod tests {
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
.unwrap()
.count(),
MAX_ENTRIES_PER_STORAGE
MAX_ENTRIES_PER_STORAGE - 1
);
// Add two files to storage 0, verify it is updated without needing to run check:
@ -748,7 +782,7 @@ mod tests {
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
.unwrap()
.count(),
MAX_ENTRIES_PER_STORAGE + 1
MAX_ENTRIES_PER_STORAGE
);
// verify that we can remove files as well, but that it isn't observed until check is run
@ -826,15 +860,20 @@ mod tests {
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
fs::remove_file(source_dir.path().join("too-big.txt")).unwrap();
// Up to 16 files should be okay:
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
// Up to 15 files should be okay (can watch 15 files + 1 directory)
for i in 1..MAX_ENTRIES_PER_STORAGE {
fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap();
}
assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE);
assert_eq!(
entry.scan(&logger).await.unwrap(),
MAX_ENTRIES_PER_STORAGE - 1
);
// 17 files is too many:
fs::write(source_dir.path().join("17.txt"), "updated").unwrap();
// 16 files wll be too many:
fs::write(source_dir.path().join("16.txt"), "updated").unwrap();
thread::sleep(Duration::from_secs(1));
// Expect to receive a MountTooManyFiles error
@ -881,19 +920,67 @@ mod tests {
}
#[tokio::test]
async fn watch_directory_with_symlinks() {
// Prepare source directory:
// ./tmp/.data/file.txt
// ./tmp/1.txt -> ./tmp/.data/file.txt
async fn watch_directory_verify_dir_removal() {
let source_dir = tempfile::tempdir().unwrap();
fs::create_dir_all(source_dir.path().join(".data")).unwrap();
fs::write(source_dir.path().join(".data/file.txt"), "two").unwrap();
tokio::fs::symlink(
source_dir.path().join(".data/file.txt"),
source_dir.path().join("1.txt"),
)
let dest_dir = tempfile::tempdir().unwrap();
let mut entry = Storage::new(protos::Storage {
source: source_dir.path().display().to_string(),
mount_point: dest_dir.path().display().to_string(),
..Default::default()
})
.await
.unwrap();
let logger = slog::Logger::root(slog::Discard, o!());
// create a path we'll remove later
fs::create_dir_all(source_dir.path().join("tmp")).unwrap();
fs::write(&source_dir.path().join("tmp/test-file"), "foo").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 3); // root, ./tmp, test-file
// Verify expected directory, file:
assert_eq!(
std::fs::read_dir(dest_dir.path().join("tmp"))
.unwrap()
.count(),
1
);
assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 1);
// Now, remove directory, and verify that the directory (and its file) are removed:
fs::remove_dir_all(source_dir.path().join("tmp")).unwrap();
thread::sleep(Duration::from_secs(1));
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 0);
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
}
#[tokio::test]
async fn watch_directory_with_symlinks() {
// Prepare source directory:
// ..2021_10_29_03_10_48.161654083/file.txt
// ..data -> ..2021_10_29_03_10_48.161654083
// file.txt -> ..data/file.txt
let source_dir = tempfile::tempdir().unwrap();
let actual_dir = source_dir.path().join("..2021_10_29_03_10_48.161654083");
let actual_file = actual_dir.join("file.txt");
let sym_dir = source_dir.path().join("..data");
let sym_file = source_dir.path().join("file.txt");
// create backing file/path
fs::create_dir_all(&actual_dir).unwrap();
fs::write(&actual_file, "two").unwrap();
// create indirection symlink directory tha points to actual_dir:
tokio::fs::symlink(&actual_dir, &sym_dir).await.unwrap();
// create presented data file symlink:
tokio::fs::symlink(sym_dir.join("file.txt"), sym_file)
.await
.unwrap();
let dest_dir = tempfile::tempdir().unwrap();
@ -907,26 +994,31 @@ mod tests {
let logger = slog::Logger::root(slog::Discard, o!());
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
assert_eq!(entry.scan(&logger).await.unwrap(), 5);
// Should copy no files since nothing is changed since last check
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
// Should copy 1 file
// now what, what is updated?
fs::write(actual_file, "updated").unwrap();
thread::sleep(Duration::from_secs(1));
fs::write(source_dir.path().join(".data/file.txt"), "updated").unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
assert_eq!(
fs::read_to_string(dest_dir.path().join(".data/file.txt")).unwrap(),
"updated"
);
assert_eq!(
fs::read_to_string(dest_dir.path().join("1.txt")).unwrap(),
fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(),
"updated"
);
// Verify that resulting 1.txt is a symlink:
assert!(tokio::fs::symlink_metadata(dest_dir.path().join("1.txt"))
// Verify that resulting file.txt is a symlink:
assert!(
tokio::fs::symlink_metadata(dest_dir.path().join("file.txt"))
.await
.unwrap()
.file_type()
.is_symlink()
);
// Verify that .data directory is a symlink:
assert!(tokio::fs::symlink_metadata(&dest_dir.path().join("..data"))
.await
.unwrap()
.file_type()
@ -934,6 +1026,25 @@ mod tests {
// Should copy no new files after copy happened
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
// Now, simulate configmap update.
// - create a new actual dir/file,
// - update the symlink directory to point to this one
// - remove old dir/file
let new_actual_dir = source_dir.path().join("..2021_10_31_03_10_48.161654083");
let new_actual_file = new_actual_dir.join("file.txt");
fs::create_dir_all(&new_actual_dir).unwrap();
fs::write(&new_actual_file, "new configmap").unwrap();
tokio::fs::remove_file(&sym_dir).await.unwrap();
tokio::fs::symlink(&new_actual_dir, &sym_dir).await.unwrap();
tokio::fs::remove_dir_all(&actual_dir).await.unwrap();
assert_eq!(entry.scan(&logger).await.unwrap(), 3); // file, file-dir, symlink
assert_eq!(
fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(),
"new configmap"
);
}
#[tokio::test]
@ -958,7 +1069,7 @@ mod tests {
let logger = slog::Logger::root(slog::Discard, o!());
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
assert_eq!(entry.scan(&logger).await.unwrap(), 5);
// Should copy no files since nothing is changed since last check
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
@ -1028,8 +1139,9 @@ mod tests {
let logger = slog::Logger::root(slog::Discard, o!());
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
assert_eq!(entry.watched_files.len(), 1);
// expect the root directory and the file:
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
assert_eq!(entry.watched_files.len(), 2);
assert!(target_file.exists());
assert!(entry.watched_files.contains_key(&source_file));
@ -1039,7 +1151,7 @@ mod tests {
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
assert_eq!(entry.watched_files.len(), 0);
assert_eq!(entry.watched_files.len(), 1);
assert!(!target_file.exists());
}