diff --git a/src/agent/src/watcher.rs b/src/agent/src/watcher.rs index b111aa166f..bb3fb16411 100644 --- a/src/agent/src/watcher.rs +++ b/src/agent/src/watcher.rs @@ -49,7 +49,7 @@ struct Storage { /// the source becomes too large, either in number of files (>16) or total size (>1MB). watch: bool, - /// The list of files to watch from the source mount point and updated in the target one. + /// The list of files, directories, symlinks to watch from the source mount point and updated in the target one. watched_files: HashMap, } @@ -79,6 +79,20 @@ impl Drop for Storage { } } +async fn copy(from: impl AsRef, to: impl AsRef) -> Result<()> { + if fs::symlink_metadata(&from).await?.file_type().is_symlink() { + // if source is a symlink, create new symlink with same link source. If + // the symlink exists, remove and create new one: + if fs::symlink_metadata(&to).await.is_ok() { + fs::remove_file(&to).await?; + } + fs::symlink(fs::read_link(&from).await?, &to).await?; + } else { + fs::copy(from, to).await?; + } + Ok(()) +} + impl Storage { async fn new(storage: protos::Storage) -> Result { let entry = Storage { @@ -93,6 +107,16 @@ impl Storage { async fn update_target(&self, logger: &Logger, source_path: impl AsRef) -> Result<()> { let source_file_path = source_path.as_ref(); + // if we are creating a directory: just create it, nothing more to do + if source_file_path.symlink_metadata()?.file_type().is_dir() { + fs::create_dir_all(source_file_path) + .await + .with_context(|| { + format!("Unable to mkdir all for {}", source_file_path.display()) + })? + } + + // Assume we are dealing with either a file or a symlink now: let dest_file_path = if self.source_mount_point.is_file() { // Simple file to file copy // Assume target mount is a file path @@ -110,19 +134,13 @@ impl Storage { dest_file_path }; - debug!( - logger, - "Copy from {} to {}", - source_file_path.display(), - dest_file_path.display() - ); - fs::copy(&source_file_path, &dest_file_path) + copy(&source_file_path, &dest_file_path) .await .with_context(|| { format!( "Copy from {} to {} failed", source_file_path.display(), - dest_file_path.display() + dest_file_path.display(), ) })?; @@ -135,7 +153,7 @@ impl Storage { let mut remove_list = Vec::new(); let mut updated_files: Vec = Vec::new(); - // Remove deleted files for tracking list + // Remove deleted files for tracking list. self.watched_files.retain(|st, _| { if st.exists() { true @@ -147,10 +165,19 @@ impl Storage { // Delete from target for path in remove_list { - // File has been deleted, remove it from target mount let target = self.make_target_path(path)?; - debug!(logger, "Removing file from mount: {}", target.display()); - let _ = fs::remove_file(target).await; + // The target may be a directory or a file. If it is a directory that is removed, + // we'll remove all files under that directory as well. Because of this, there's a + // chance the target (a subdirectory or file under a prior removed target) was already + // removed. Make sure we check if the target exists before checking the metadata, and + // don't return an error if the remove fails + if target.exists() && target.symlink_metadata()?.file_type().is_dir() { + debug!(logger, "Removing a directory: {}", target.display()); + let _ = fs::remove_dir_all(target).await; + } else { + debug!(logger, "Removing a file: {}", target.display()); + let _ = fs::remove_file(target).await; + } } // Scan new & changed files @@ -182,15 +209,16 @@ impl Storage { let mut size: u64 = 0; debug!(logger, "Scanning path: {}", path.display()); - if path.is_file() { - let metadata = path - .metadata() - .with_context(|| format!("Failed to query metadata for: {}", path.display()))?; + let metadata = path + .symlink_metadata() + .with_context(|| format!("Failed to query metadata for: {}", path.display()))?; - let modified = metadata - .modified() - .with_context(|| format!("Failed to get modified date for: {}", path.display()))?; + let modified = metadata + .modified() + .with_context(|| format!("Failed to get modified date for: {}", path.display()))?; + // Treat files and symlinks the same: + if path.is_file() || metadata.file_type().is_symlink() { size += metadata.len(); // Insert will return old entry if any @@ -212,6 +240,16 @@ impl Storage { } ); } else { + // Handling regular directories - check to see if this directory is already being tracked, and + // track if not: + if self + .watched_files + .insert(path.to_path_buf(), modified) + .is_none() + { + update_list.push(path.to_path_buf()); + } + // Scan dir recursively let mut entries = fs::read_dir(path) .await @@ -612,7 +650,7 @@ mod tests { .unwrap(); // setup storage3: many files, but still watchable - for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + for i in 1..MAX_ENTRIES_PER_STORAGE { fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap(); } @@ -622,6 +660,9 @@ mod tests { ..Default::default() }; + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + entries .add(std::iter::once(storage0), &logger) .await @@ -674,7 +715,7 @@ mod tests { std::fs::read_dir(entries.0[3].target_mount_point.as_path()) .unwrap() .count(), - MAX_ENTRIES_PER_STORAGE + MAX_ENTRIES_PER_STORAGE - 1 ); // Add two files to storage 0, verify it is updated without needing to run check: @@ -692,6 +733,9 @@ mod tests { "updated" ); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + // // Prepare for second check: update mount sources // @@ -744,7 +788,7 @@ mod tests { std::fs::read_dir(entries.0[3].target_mount_point.as_path()) .unwrap() .count(), - MAX_ENTRIES_PER_STORAGE + 1 + MAX_ENTRIES_PER_STORAGE ); // verify that we can remove files as well, but that it isn't observed until check is run @@ -822,15 +866,20 @@ mod tests { fs::remove_file(source_dir.path().join("big.txt")).unwrap(); fs::remove_file(source_dir.path().join("too-big.txt")).unwrap(); - // Up to 16 files should be okay: - for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Up to 15 files should be okay (can watch 15 files + 1 directory) + for i in 1..MAX_ENTRIES_PER_STORAGE { fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap(); } - assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE); + assert_eq!( + entry.scan(&logger).await.unwrap(), + MAX_ENTRIES_PER_STORAGE - 1 + ); - // 17 files is too many: - fs::write(source_dir.path().join("17.txt"), "updated").unwrap(); + // 16 files wll be too many: + fs::write(source_dir.path().join("16.txt"), "updated").unwrap(); thread::sleep(Duration::from_secs(1)); // Expect to receive a MountTooManyFiles error @@ -843,6 +892,180 @@ mod tests { } } + #[tokio::test] + async fn test_copy() { + // prepare tmp src/destination + let source_dir = tempfile::tempdir().unwrap(); + let dest_dir = tempfile::tempdir().unwrap(); + + // verify copy of a regular file + let src_file = source_dir.path().join("file.txt"); + let dst_file = dest_dir.path().join("file.txt"); + fs::write(&src_file, "foo").unwrap(); + copy(&src_file, &dst_file).await.unwrap(); + // verify destination: + assert!(!fs::symlink_metadata(dst_file) + .unwrap() + .file_type() + .is_symlink()); + + // verify copy of a symlink + let src_symlink_file = source_dir.path().join("symlink_file.txt"); + let dst_symlink_file = dest_dir.path().join("symlink_file.txt"); + tokio::fs::symlink(&src_file, &src_symlink_file) + .await + .unwrap(); + copy(src_symlink_file, &dst_symlink_file).await.unwrap(); + // verify destination: + assert!(fs::symlink_metadata(&dst_symlink_file) + .unwrap() + .file_type() + .is_symlink()); + assert_eq!(fs::read_link(&dst_symlink_file).unwrap(), src_file); + assert_eq!(fs::read_to_string(&dst_symlink_file).unwrap(), "foo") + } + + #[tokio::test] + async fn watch_directory_verify_dir_removal() { + let source_dir = tempfile::tempdir().unwrap(); + let dest_dir = tempfile::tempdir().unwrap(); + + let mut entry = Storage::new(protos::Storage { + source: source_dir.path().display().to_string(), + mount_point: dest_dir.path().display().to_string(), + ..Default::default() + }) + .await + .unwrap(); + let logger = slog::Logger::root(slog::Discard, o!()); + + // create a path we'll remove later + fs::create_dir_all(source_dir.path().join("tmp")).unwrap(); + fs::write(&source_dir.path().join("tmp/test-file"), "foo").unwrap(); + assert_eq!(entry.scan(&logger).await.unwrap(), 3); // root, ./tmp, test-file + + // Verify expected directory, file: + assert_eq!( + std::fs::read_dir(dest_dir.path().join("tmp")) + .unwrap() + .count(), + 1 + ); + assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 1); + + // Now, remove directory, and verify that the directory (and its file) are removed: + fs::remove_dir_all(source_dir.path().join("tmp")).unwrap(); + thread::sleep(Duration::from_secs(1)); + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 0); + + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + } + + #[tokio::test] + async fn watch_directory_with_symlinks() { + // Prepare source directory: + // ..2021_10_29_03_10_48.161654083/file.txt + // ..data -> ..2021_10_29_03_10_48.161654083 + // file.txt -> ..data/file.txt + + let source_dir = tempfile::tempdir().unwrap(); + let actual_dir = source_dir.path().join("..2021_10_29_03_10_48.161654083"); + let actual_file = actual_dir.join("file.txt"); + let sym_dir = source_dir.path().join("..data"); + let sym_file = source_dir.path().join("file.txt"); + + let relative_to_dir = PathBuf::from("..2021_10_29_03_10_48.161654083"); + + // create backing file/path + fs::create_dir_all(&actual_dir).unwrap(); + fs::write(&actual_file, "two").unwrap(); + + // create indirection symlink directory that points to the directory that holds the actual file: + tokio::fs::symlink(&relative_to_dir, &sym_dir) + .await + .unwrap(); + + // create presented data file symlink: + tokio::fs::symlink(PathBuf::from("..data/file.txt"), sym_file) + .await + .unwrap(); + + let dest_dir = tempfile::tempdir().unwrap(); + + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + + let mut entry = Storage::new(protos::Storage { + source: source_dir.path().display().to_string(), + mount_point: dest_dir.path().display().to_string(), + ..Default::default() + }) + .await + .unwrap(); + + let logger = slog::Logger::root(slog::Discard, o!()); + + assert_eq!(entry.scan(&logger).await.unwrap(), 5); + + // Should copy no files since nothing is changed since last check + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // now what, what is updated? + fs::write(actual_file, "updated").unwrap(); + + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + + assert_eq!(entry.scan(&logger).await.unwrap(), 1); + + assert_eq!( + fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), + "updated" + ); + + // Verify that resulting file.txt is a symlink: + assert!( + tokio::fs::symlink_metadata(dest_dir.path().join("file.txt")) + .await + .unwrap() + .file_type() + .is_symlink() + ); + + // Verify that .data directory is a symlink: + assert!(tokio::fs::symlink_metadata(&dest_dir.path().join("..data")) + .await + .unwrap() + .file_type() + .is_symlink()); + + // Should copy no new files after copy happened + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Now, simulate configmap update. + // - create a new actual dir/file, + // - update the symlink directory to point to this one + // - remove old dir/file + let new_actual_dir = source_dir.path().join("..2021_10_31"); + let new_actual_file = new_actual_dir.join("file.txt"); + fs::create_dir_all(&new_actual_dir).unwrap(); + fs::write(&new_actual_file, "new configmap").unwrap(); + + tokio::fs::remove_file(&sym_dir).await.unwrap(); + tokio::fs::symlink(PathBuf::from("..2021_10_31"), &sym_dir) + .await + .unwrap(); + tokio::fs::remove_dir_all(&actual_dir).await.unwrap(); + + assert_eq!(entry.scan(&logger).await.unwrap(), 3); // file, file-dir, symlink + assert_eq!( + fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), + "new configmap" + ); + } + #[tokio::test] async fn watch_directory() { // Prepare source directory: @@ -853,6 +1076,9 @@ mod tests { fs::create_dir_all(source_dir.path().join("A/B")).unwrap(); fs::write(source_dir.path().join("A/B/1.txt"), "two").unwrap(); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + let dest_dir = tempfile::tempdir().unwrap(); let mut entry = Storage::new(protos::Storage { @@ -865,13 +1091,11 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 5); // Should copy no files since nothing is changed since last check assert_eq!(entry.scan(&logger).await.unwrap(), 0); - // Should copy 1 file - thread::sleep(Duration::from_secs(1)); fs::write(source_dir.path().join("A/B/1.txt"), "updated").unwrap(); assert_eq!(entry.scan(&logger).await.unwrap(), 1); assert_eq!( @@ -879,6 +1103,9 @@ mod tests { "updated" ); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + // Should copy no new files after copy happened assert_eq!(entry.scan(&logger).await.unwrap(), 0); @@ -909,7 +1136,9 @@ mod tests { assert_eq!(entry.scan(&logger).await.unwrap(), 1); - thread::sleep(Duration::from_secs(1)); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + fs::write(&source_file, "two").unwrap(); assert_eq!(entry.scan(&logger).await.unwrap(), 1); assert_eq!(fs::read_to_string(&dest_file).unwrap(), "two"); @@ -935,8 +1164,9 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 1); - assert_eq!(entry.watched_files.len(), 1); + // expect the root directory and the file: + assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.watched_files.len(), 2); assert!(target_file.exists()); assert!(entry.watched_files.contains_key(&source_file)); @@ -946,7 +1176,7 @@ mod tests { assert_eq!(entry.scan(&logger).await.unwrap(), 0); - assert_eq!(entry.watched_files.len(), 0); + assert_eq!(entry.watched_files.len(), 1); assert!(!target_file.exists()); } @@ -992,6 +1222,8 @@ mod tests { watcher.mount(&logger).await.unwrap(); assert!(is_mounted(WATCH_MOUNT_POINT_PATH).unwrap()); + thread::sleep(Duration::from_millis(20)); + watcher.cleanup(); assert!(!is_mounted(WATCH_MOUNT_POINT_PATH).unwrap()); }