diff --git a/src/agent/src/watcher.rs b/src/agent/src/watcher.rs index 187d85ccc8..fd0f9fe861 100644 --- a/src/agent/src/watcher.rs +++ b/src/agent/src/watcher.rs @@ -49,7 +49,7 @@ struct Storage { /// the source becomes too large, either in number of files (>16) or total size (>1MB). watch: bool, - /// The list of files to watch from the source mount point and updated in the target one. + /// The list of files, directories, symlinks to watch from the source mount point and updated in the target one. watched_files: HashMap, } @@ -80,9 +80,13 @@ impl Drop for Storage { } async fn copy(from: impl AsRef, to: impl AsRef) -> Result<()> { - // if source is a symlink, just create new symlink with same link source if fs::symlink_metadata(&from).await?.file_type().is_symlink() { - fs::symlink(fs::read_link(&from).await?, to).await?; + // if source is a symlink, create new symlink with same link source. If + // the symlink exists, remove and create new one: + if fs::symlink_metadata(&to).await.is_ok() { + fs::remove_file(&to).await?; + } + fs::symlink(fs::read_link(&from).await?, &to).await?; } else { fs::copy(from, to).await?; } @@ -103,6 +107,16 @@ impl Storage { async fn update_target(&self, logger: &Logger, source_path: impl AsRef) -> Result<()> { let source_file_path = source_path.as_ref(); + // if we are creating a directory: just create it, nothing more to do + if source_file_path.symlink_metadata()?.file_type().is_dir() { + fs::create_dir_all(source_file_path) + .await + .with_context(|| { + format!("Unable to mkdir all for {}", source_file_path.display()) + })? + } + + // Assume we are dealing with either a file or a symlink now: let dest_file_path = if self.source_mount_point.is_file() { // Simple file to file copy // Assume target mount is a file path @@ -139,7 +153,7 @@ impl Storage { let mut remove_list = Vec::new(); let mut updated_files: Vec = Vec::new(); - // Remove deleted files for tracking list + // Remove deleted files for tracking list. self.watched_files.retain(|st, _| { if st.exists() { true @@ -151,10 +165,19 @@ impl Storage { // Delete from target for path in remove_list { - // File has been deleted, remove it from target mount let target = self.make_target_path(path)?; - debug!(logger, "Removing file from mount: {}", target.display()); - let _ = fs::remove_file(target).await; + // The target may be a directory or a file. If it is a directory that is removed, + // we'll remove all files under that directory as well. Because of this, there's a + // chance the target (a subdirectory or file under a prior removed target) was already + // removed. Make sure we check if the target exists before checking the metadata, and + // don't return an error if the remove fails + if target.exists() && target.symlink_metadata()?.file_type().is_dir() { + debug!(logger, "Removing a directory: {}", target.display()); + let _ = fs::remove_dir_all(target).await; + } else { + debug!(logger, "Removing a file: {}", target.display()); + let _ = fs::remove_file(target).await; + } } // Scan new & changed files @@ -186,15 +209,16 @@ impl Storage { let mut size: u64 = 0; debug!(logger, "Scanning path: {}", path.display()); - if path.is_file() { - let metadata = path - .metadata() - .with_context(|| format!("Failed to query metadata for: {}", path.display()))?; + let metadata = path + .symlink_metadata() + .with_context(|| format!("Failed to query metadata for: {}", path.display()))?; - let modified = metadata - .modified() - .with_context(|| format!("Failed to get modified date for: {}", path.display()))?; + let modified = metadata + .modified() + .with_context(|| format!("Failed to get modified date for: {}", path.display()))?; + // Treat files and symlinks the same: + if path.is_file() || metadata.file_type().is_symlink() { size += metadata.len(); // Insert will return old entry if any @@ -216,6 +240,16 @@ impl Storage { } ); } else { + // Handling regular directories - check to see if this directory is already being tracked, and + // track if not: + if self + .watched_files + .insert(path.to_path_buf(), modified) + .is_none() + { + update_list.push(path.to_path_buf()); + } + // Scan dir recursively let mut entries = fs::read_dir(path) .await @@ -616,7 +650,7 @@ mod tests { .unwrap(); // setup storage3: many files, but still watchable - for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + for i in 1..MAX_ENTRIES_PER_STORAGE { fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap(); } @@ -678,7 +712,7 @@ mod tests { std::fs::read_dir(entries.0[3].target_mount_point.as_path()) .unwrap() .count(), - MAX_ENTRIES_PER_STORAGE + MAX_ENTRIES_PER_STORAGE - 1 ); // Add two files to storage 0, verify it is updated without needing to run check: @@ -748,7 +782,7 @@ mod tests { std::fs::read_dir(entries.0[3].target_mount_point.as_path()) .unwrap() .count(), - MAX_ENTRIES_PER_STORAGE + 1 + MAX_ENTRIES_PER_STORAGE ); // verify that we can remove files as well, but that it isn't observed until check is run @@ -826,15 +860,20 @@ mod tests { fs::remove_file(source_dir.path().join("big.txt")).unwrap(); fs::remove_file(source_dir.path().join("too-big.txt")).unwrap(); - // Up to 16 files should be okay: - for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Up to 15 files should be okay (can watch 15 files + 1 directory) + for i in 1..MAX_ENTRIES_PER_STORAGE { fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap(); } - assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE); + assert_eq!( + entry.scan(&logger).await.unwrap(), + MAX_ENTRIES_PER_STORAGE - 1 + ); - // 17 files is too many: - fs::write(source_dir.path().join("17.txt"), "updated").unwrap(); + // 16 files wll be too many: + fs::write(source_dir.path().join("16.txt"), "updated").unwrap(); thread::sleep(Duration::from_secs(1)); // Expect to receive a MountTooManyFiles error @@ -881,19 +920,67 @@ mod tests { } #[tokio::test] - async fn watch_directory_with_symlinks() { - // Prepare source directory: - // ./tmp/.data/file.txt - // ./tmp/1.txt -> ./tmp/.data/file.txt + async fn watch_directory_verify_dir_removal() { let source_dir = tempfile::tempdir().unwrap(); - fs::create_dir_all(source_dir.path().join(".data")).unwrap(); - fs::write(source_dir.path().join(".data/file.txt"), "two").unwrap(); - tokio::fs::symlink( - source_dir.path().join(".data/file.txt"), - source_dir.path().join("1.txt"), - ) + let dest_dir = tempfile::tempdir().unwrap(); + + let mut entry = Storage::new(protos::Storage { + source: source_dir.path().display().to_string(), + mount_point: dest_dir.path().display().to_string(), + ..Default::default() + }) .await .unwrap(); + let logger = slog::Logger::root(slog::Discard, o!()); + + // create a path we'll remove later + fs::create_dir_all(source_dir.path().join("tmp")).unwrap(); + fs::write(&source_dir.path().join("tmp/test-file"), "foo").unwrap(); + assert_eq!(entry.scan(&logger).await.unwrap(), 3); // root, ./tmp, test-file + + // Verify expected directory, file: + assert_eq!( + std::fs::read_dir(dest_dir.path().join("tmp")) + .unwrap() + .count(), + 1 + ); + assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 1); + + // Now, remove directory, and verify that the directory (and its file) are removed: + fs::remove_dir_all(source_dir.path().join("tmp")).unwrap(); + thread::sleep(Duration::from_secs(1)); + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 0); + + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + } + + #[tokio::test] + async fn watch_directory_with_symlinks() { + // Prepare source directory: + // ..2021_10_29_03_10_48.161654083/file.txt + // ..data -> ..2021_10_29_03_10_48.161654083 + // file.txt -> ..data/file.txt + + let source_dir = tempfile::tempdir().unwrap(); + let actual_dir = source_dir.path().join("..2021_10_29_03_10_48.161654083"); + let actual_file = actual_dir.join("file.txt"); + let sym_dir = source_dir.path().join("..data"); + let sym_file = source_dir.path().join("file.txt"); + + // create backing file/path + fs::create_dir_all(&actual_dir).unwrap(); + fs::write(&actual_file, "two").unwrap(); + + // create indirection symlink directory tha points to actual_dir: + tokio::fs::symlink(&actual_dir, &sym_dir).await.unwrap(); + + // create presented data file symlink: + tokio::fs::symlink(sym_dir.join("file.txt"), sym_file) + .await + .unwrap(); let dest_dir = tempfile::tempdir().unwrap(); @@ -907,26 +994,31 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 5); // Should copy no files since nothing is changed since last check assert_eq!(entry.scan(&logger).await.unwrap(), 0); - // Should copy 1 file + // now what, what is updated? + fs::write(actual_file, "updated").unwrap(); thread::sleep(Duration::from_secs(1)); - fs::write(source_dir.path().join(".data/file.txt"), "updated").unwrap(); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 1); assert_eq!( - fs::read_to_string(dest_dir.path().join(".data/file.txt")).unwrap(), - "updated" - ); - assert_eq!( - fs::read_to_string(dest_dir.path().join("1.txt")).unwrap(), + fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), "updated" ); - // Verify that resulting 1.txt is a symlink: - assert!(tokio::fs::symlink_metadata(dest_dir.path().join("1.txt")) + // Verify that resulting file.txt is a symlink: + assert!( + tokio::fs::symlink_metadata(dest_dir.path().join("file.txt")) + .await + .unwrap() + .file_type() + .is_symlink() + ); + + // Verify that .data directory is a symlink: + assert!(tokio::fs::symlink_metadata(&dest_dir.path().join("..data")) .await .unwrap() .file_type() @@ -934,6 +1026,25 @@ mod tests { // Should copy no new files after copy happened assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Now, simulate configmap update. + // - create a new actual dir/file, + // - update the symlink directory to point to this one + // - remove old dir/file + let new_actual_dir = source_dir.path().join("..2021_10_31_03_10_48.161654083"); + let new_actual_file = new_actual_dir.join("file.txt"); + fs::create_dir_all(&new_actual_dir).unwrap(); + fs::write(&new_actual_file, "new configmap").unwrap(); + + tokio::fs::remove_file(&sym_dir).await.unwrap(); + tokio::fs::symlink(&new_actual_dir, &sym_dir).await.unwrap(); + tokio::fs::remove_dir_all(&actual_dir).await.unwrap(); + + assert_eq!(entry.scan(&logger).await.unwrap(), 3); // file, file-dir, symlink + assert_eq!( + fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), + "new configmap" + ); } #[tokio::test] @@ -958,7 +1069,7 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 5); // Should copy no files since nothing is changed since last check assert_eq!(entry.scan(&logger).await.unwrap(), 0); @@ -1028,8 +1139,9 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 1); - assert_eq!(entry.watched_files.len(), 1); + // expect the root directory and the file: + assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.watched_files.len(), 2); assert!(target_file.exists()); assert!(entry.watched_files.contains_key(&source_file)); @@ -1039,7 +1151,7 @@ mod tests { assert_eq!(entry.scan(&logger).await.unwrap(), 0); - assert_eq!(entry.watched_files.len(), 0); + assert_eq!(entry.watched_files.len(), 1); assert!(!target_file.exists()); }