mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 04:04:45 +00:00
watchers: handle symlinked directories, dir removal
- Even a directory could be a symlink - check for this. This is very common when using configmaps/secrets - Add unit test to better mimic a configmap, configmap update - We would never remove directories before. Let's ensure that these are added to the watched_list, and verify in unit tests - Update unit tests which exercise maximum number of files per entry. There's a change in behavior now that we consider directories/symlinks watchable as well. For these tests, it means we support one less file in a watchable mount. Signed-off-by: Eric Ernst <eric_ernst@apple.com>
This commit is contained in:
parent
2b6dfe414a
commit
296e76f8ee
@ -49,7 +49,7 @@ struct Storage {
|
||||
/// the source becomes too large, either in number of files (>16) or total size (>1MB).
|
||||
watch: bool,
|
||||
|
||||
/// The list of files to watch from the source mount point and updated in the target one.
|
||||
/// The list of files, directories, symlinks to watch from the source mount point and updated in the target one.
|
||||
watched_files: HashMap<PathBuf, SystemTime>,
|
||||
}
|
||||
|
||||
@ -80,9 +80,13 @@ impl Drop for Storage {
|
||||
}
|
||||
|
||||
async fn copy(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<()> {
|
||||
// if source is a symlink, just create new symlink with same link source
|
||||
if fs::symlink_metadata(&from).await?.file_type().is_symlink() {
|
||||
fs::symlink(fs::read_link(&from).await?, to).await?;
|
||||
// if source is a symlink, create new symlink with same link source. If
|
||||
// the symlink exists, remove and create new one:
|
||||
if fs::symlink_metadata(&to).await.is_ok() {
|
||||
fs::remove_file(&to).await?;
|
||||
}
|
||||
fs::symlink(fs::read_link(&from).await?, &to).await?;
|
||||
} else {
|
||||
fs::copy(from, to).await?;
|
||||
}
|
||||
@ -103,6 +107,16 @@ impl Storage {
|
||||
async fn update_target(&self, logger: &Logger, source_path: impl AsRef<Path>) -> Result<()> {
|
||||
let source_file_path = source_path.as_ref();
|
||||
|
||||
// if we are creating a directory: just create it, nothing more to do
|
||||
if source_file_path.symlink_metadata()?.file_type().is_dir() {
|
||||
fs::create_dir_all(source_file_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Unable to mkdir all for {}", source_file_path.display())
|
||||
})?
|
||||
}
|
||||
|
||||
// Assume we are dealing with either a file or a symlink now:
|
||||
let dest_file_path = if self.source_mount_point.is_file() {
|
||||
// Simple file to file copy
|
||||
// Assume target mount is a file path
|
||||
@ -139,7 +153,7 @@ impl Storage {
|
||||
let mut remove_list = Vec::new();
|
||||
let mut updated_files: Vec<PathBuf> = Vec::new();
|
||||
|
||||
// Remove deleted files for tracking list
|
||||
// Remove deleted files for tracking list.
|
||||
self.watched_files.retain(|st, _| {
|
||||
if st.exists() {
|
||||
true
|
||||
@ -151,10 +165,19 @@ impl Storage {
|
||||
|
||||
// Delete from target
|
||||
for path in remove_list {
|
||||
// File has been deleted, remove it from target mount
|
||||
let target = self.make_target_path(path)?;
|
||||
debug!(logger, "Removing file from mount: {}", target.display());
|
||||
let _ = fs::remove_file(target).await;
|
||||
// The target may be a directory or a file. If it is a directory that is removed,
|
||||
// we'll remove all files under that directory as well. Because of this, there's a
|
||||
// chance the target (a subdirectory or file under a prior removed target) was already
|
||||
// removed. Make sure we check if the target exists before checking the metadata, and
|
||||
// don't return an error if the remove fails
|
||||
if target.exists() && target.symlink_metadata()?.file_type().is_dir() {
|
||||
debug!(logger, "Removing a directory: {}", target.display());
|
||||
let _ = fs::remove_dir_all(target).await;
|
||||
} else {
|
||||
debug!(logger, "Removing a file: {}", target.display());
|
||||
let _ = fs::remove_file(target).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Scan new & changed files
|
||||
@ -186,15 +209,16 @@ impl Storage {
|
||||
let mut size: u64 = 0;
|
||||
debug!(logger, "Scanning path: {}", path.display());
|
||||
|
||||
if path.is_file() {
|
||||
let metadata = path
|
||||
.metadata()
|
||||
.with_context(|| format!("Failed to query metadata for: {}", path.display()))?;
|
||||
let metadata = path
|
||||
.symlink_metadata()
|
||||
.with_context(|| format!("Failed to query metadata for: {}", path.display()))?;
|
||||
|
||||
let modified = metadata
|
||||
.modified()
|
||||
.with_context(|| format!("Failed to get modified date for: {}", path.display()))?;
|
||||
let modified = metadata
|
||||
.modified()
|
||||
.with_context(|| format!("Failed to get modified date for: {}", path.display()))?;
|
||||
|
||||
// Treat files and symlinks the same:
|
||||
if path.is_file() || metadata.file_type().is_symlink() {
|
||||
size += metadata.len();
|
||||
|
||||
// Insert will return old entry if any
|
||||
@ -216,6 +240,16 @@ impl Storage {
|
||||
}
|
||||
);
|
||||
} else {
|
||||
// Handling regular directories - check to see if this directory is already being tracked, and
|
||||
// track if not:
|
||||
if self
|
||||
.watched_files
|
||||
.insert(path.to_path_buf(), modified)
|
||||
.is_none()
|
||||
{
|
||||
update_list.push(path.to_path_buf());
|
||||
}
|
||||
|
||||
// Scan dir recursively
|
||||
let mut entries = fs::read_dir(path)
|
||||
.await
|
||||
@ -616,7 +650,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// setup storage3: many files, but still watchable
|
||||
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
|
||||
for i in 1..MAX_ENTRIES_PER_STORAGE {
|
||||
fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap();
|
||||
}
|
||||
|
||||
@ -678,7 +712,7 @@ mod tests {
|
||||
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
|
||||
.unwrap()
|
||||
.count(),
|
||||
MAX_ENTRIES_PER_STORAGE
|
||||
MAX_ENTRIES_PER_STORAGE - 1
|
||||
);
|
||||
|
||||
// Add two files to storage 0, verify it is updated without needing to run check:
|
||||
@ -748,7 +782,7 @@ mod tests {
|
||||
std::fs::read_dir(entries.0[3].target_mount_point.as_path())
|
||||
.unwrap()
|
||||
.count(),
|
||||
MAX_ENTRIES_PER_STORAGE + 1
|
||||
MAX_ENTRIES_PER_STORAGE
|
||||
);
|
||||
|
||||
// verify that we can remove files as well, but that it isn't observed until check is run
|
||||
@ -826,15 +860,20 @@ mod tests {
|
||||
fs::remove_file(source_dir.path().join("big.txt")).unwrap();
|
||||
fs::remove_file(source_dir.path().join("too-big.txt")).unwrap();
|
||||
|
||||
// Up to 16 files should be okay:
|
||||
for i in 1..MAX_ENTRIES_PER_STORAGE + 1 {
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
|
||||
// Up to 15 files should be okay (can watch 15 files + 1 directory)
|
||||
for i in 1..MAX_ENTRIES_PER_STORAGE {
|
||||
fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE);
|
||||
assert_eq!(
|
||||
entry.scan(&logger).await.unwrap(),
|
||||
MAX_ENTRIES_PER_STORAGE - 1
|
||||
);
|
||||
|
||||
// 17 files is too many:
|
||||
fs::write(source_dir.path().join("17.txt"), "updated").unwrap();
|
||||
// 16 files wll be too many:
|
||||
fs::write(source_dir.path().join("16.txt"), "updated").unwrap();
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
|
||||
// Expect to receive a MountTooManyFiles error
|
||||
@ -881,19 +920,67 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn watch_directory_with_symlinks() {
|
||||
// Prepare source directory:
|
||||
// ./tmp/.data/file.txt
|
||||
// ./tmp/1.txt -> ./tmp/.data/file.txt
|
||||
async fn watch_directory_verify_dir_removal() {
|
||||
let source_dir = tempfile::tempdir().unwrap();
|
||||
fs::create_dir_all(source_dir.path().join(".data")).unwrap();
|
||||
fs::write(source_dir.path().join(".data/file.txt"), "two").unwrap();
|
||||
tokio::fs::symlink(
|
||||
source_dir.path().join(".data/file.txt"),
|
||||
source_dir.path().join("1.txt"),
|
||||
)
|
||||
let dest_dir = tempfile::tempdir().unwrap();
|
||||
|
||||
let mut entry = Storage::new(protos::Storage {
|
||||
source: source_dir.path().display().to_string(),
|
||||
mount_point: dest_dir.path().display().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
|
||||
// create a path we'll remove later
|
||||
fs::create_dir_all(source_dir.path().join("tmp")).unwrap();
|
||||
fs::write(&source_dir.path().join("tmp/test-file"), "foo").unwrap();
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 3); // root, ./tmp, test-file
|
||||
|
||||
// Verify expected directory, file:
|
||||
assert_eq!(
|
||||
std::fs::read_dir(dest_dir.path().join("tmp"))
|
||||
.unwrap()
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 1);
|
||||
|
||||
// Now, remove directory, and verify that the directory (and its file) are removed:
|
||||
fs::remove_dir_all(source_dir.path().join("tmp")).unwrap();
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
|
||||
assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 0);
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn watch_directory_with_symlinks() {
|
||||
// Prepare source directory:
|
||||
// ..2021_10_29_03_10_48.161654083/file.txt
|
||||
// ..data -> ..2021_10_29_03_10_48.161654083
|
||||
// file.txt -> ..data/file.txt
|
||||
|
||||
let source_dir = tempfile::tempdir().unwrap();
|
||||
let actual_dir = source_dir.path().join("..2021_10_29_03_10_48.161654083");
|
||||
let actual_file = actual_dir.join("file.txt");
|
||||
let sym_dir = source_dir.path().join("..data");
|
||||
let sym_file = source_dir.path().join("file.txt");
|
||||
|
||||
// create backing file/path
|
||||
fs::create_dir_all(&actual_dir).unwrap();
|
||||
fs::write(&actual_file, "two").unwrap();
|
||||
|
||||
// create indirection symlink directory tha points to actual_dir:
|
||||
tokio::fs::symlink(&actual_dir, &sym_dir).await.unwrap();
|
||||
|
||||
// create presented data file symlink:
|
||||
tokio::fs::symlink(sym_dir.join("file.txt"), sym_file)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dest_dir = tempfile::tempdir().unwrap();
|
||||
|
||||
@ -907,26 +994,31 @@ mod tests {
|
||||
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 5);
|
||||
|
||||
// Should copy no files since nothing is changed since last check
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
|
||||
// Should copy 1 file
|
||||
// now what, what is updated?
|
||||
fs::write(actual_file, "updated").unwrap();
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
fs::write(source_dir.path().join(".data/file.txt"), "updated").unwrap();
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
|
||||
assert_eq!(
|
||||
fs::read_to_string(dest_dir.path().join(".data/file.txt")).unwrap(),
|
||||
"updated"
|
||||
);
|
||||
assert_eq!(
|
||||
fs::read_to_string(dest_dir.path().join("1.txt")).unwrap(),
|
||||
fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(),
|
||||
"updated"
|
||||
);
|
||||
|
||||
// Verify that resulting 1.txt is a symlink:
|
||||
assert!(tokio::fs::symlink_metadata(dest_dir.path().join("1.txt"))
|
||||
// Verify that resulting file.txt is a symlink:
|
||||
assert!(
|
||||
tokio::fs::symlink_metadata(dest_dir.path().join("file.txt"))
|
||||
.await
|
||||
.unwrap()
|
||||
.file_type()
|
||||
.is_symlink()
|
||||
);
|
||||
|
||||
// Verify that .data directory is a symlink:
|
||||
assert!(tokio::fs::symlink_metadata(&dest_dir.path().join("..data"))
|
||||
.await
|
||||
.unwrap()
|
||||
.file_type()
|
||||
@ -934,6 +1026,25 @@ mod tests {
|
||||
|
||||
// Should copy no new files after copy happened
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
|
||||
// Now, simulate configmap update.
|
||||
// - create a new actual dir/file,
|
||||
// - update the symlink directory to point to this one
|
||||
// - remove old dir/file
|
||||
let new_actual_dir = source_dir.path().join("..2021_10_31_03_10_48.161654083");
|
||||
let new_actual_file = new_actual_dir.join("file.txt");
|
||||
fs::create_dir_all(&new_actual_dir).unwrap();
|
||||
fs::write(&new_actual_file, "new configmap").unwrap();
|
||||
|
||||
tokio::fs::remove_file(&sym_dir).await.unwrap();
|
||||
tokio::fs::symlink(&new_actual_dir, &sym_dir).await.unwrap();
|
||||
tokio::fs::remove_dir_all(&actual_dir).await.unwrap();
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 3); // file, file-dir, symlink
|
||||
assert_eq!(
|
||||
fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(),
|
||||
"new configmap"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -958,7 +1069,7 @@ mod tests {
|
||||
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 5);
|
||||
|
||||
// Should copy no files since nothing is changed since last check
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
@ -1028,8 +1139,9 @@ mod tests {
|
||||
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 1);
|
||||
assert_eq!(entry.watched_files.len(), 1);
|
||||
// expect the root directory and the file:
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 2);
|
||||
assert_eq!(entry.watched_files.len(), 2);
|
||||
|
||||
assert!(target_file.exists());
|
||||
assert!(entry.watched_files.contains_key(&source_file));
|
||||
@ -1039,7 +1151,7 @@ mod tests {
|
||||
|
||||
assert_eq!(entry.scan(&logger).await.unwrap(), 0);
|
||||
|
||||
assert_eq!(entry.watched_files.len(), 0);
|
||||
assert_eq!(entry.watched_files.len(), 1);
|
||||
assert!(!target_file.exists());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user