diff --git a/src/tools/genpolicy/Cargo.lock b/src/tools/genpolicy/Cargo.lock index f25efe752e..55b28323ff 100644 --- a/src/tools/genpolicy/Cargo.lock +++ b/src/tools/genpolicy/Cargo.lock @@ -536,6 +536,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -619,6 +629,7 @@ dependencies = [ "docker_credential", "env_logger", "flate2", + "fs2", "generic-array", "k8s-cri", "log", diff --git a/src/tools/genpolicy/Cargo.toml b/src/tools/genpolicy/Cargo.toml index 93beebb866..a093b5cc7c 100644 --- a/src/tools/genpolicy/Cargo.toml +++ b/src/tools/genpolicy/Cargo.toml @@ -59,6 +59,7 @@ sha2 = "0.10.6" tarindex = { git = "https://github.com/kata-containers/tardev-snapshotter", rev = "06183a5" } tempfile = "3.5.0" zerocopy = "0.6.1" +fs2 = "0.4.3" # containerd image pull support k8s-cri = "0.7.0" diff --git a/src/tools/genpolicy/src/registry.rs b/src/tools/genpolicy/src/registry.rs index 7fb7228c68..87d81fb2cb 100644 --- a/src/tools/genpolicy/src/registry.rs +++ b/src/tools/genpolicy/src/registry.rs @@ -8,20 +8,23 @@ use crate::containerd; use crate::policy; +use crate::utils::Config; use crate::verity; -use crate::utils::Config; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use docker_credential::{CredentialRetrievalError, DockerCredential}; -use log::warn; -use log::{debug, info, LevelFilter}; -use oci_distribution::client::{linux_amd64_resolver, ClientConfig, ClientProtocol}; -use oci_distribution::{manifest, secrets::RegistryAuth, Client, Reference}; +use fs2::FileExt; +use log::{debug, info, warn, LevelFilter}; +use oci_distribution::{ + client::{linux_amd64_resolver, ClientConfig}, + manifest, + secrets::RegistryAuth, + Client, Reference, +}; use serde::{Deserialize, Serialize}; use sha2::{digest::typenum::Unsigned, digest::OutputSizeUser, Sha256}; -use std::io::{self, Seek, Write}; -use std::path::{Path, PathBuf}; -use tokio::{fs, io::AsyncWriteExt}; +use std::{fs::OpenOptions, io, io::BufWriter, io::Seek, io::Write, path::Path}; +use tokio::io::AsyncWriteExt; /// Container image properties obtained from an OCI repository. #[derive(Clone, Debug, Default)] @@ -57,21 +60,20 @@ pub struct DockerRootfs { } /// This application's image layer properties. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ImageLayer { pub diff_id: String, pub verity_hash: String, } impl Container { - pub async fn new(config: &Config, image: &str) -> Result { + pub async fn new(use_cached_files: bool, image: &str) -> Result { info!("============================================"); info!("Pulling manifest and config for {:?}", image); let reference: Reference = image.to_string().parse().unwrap(); let auth = build_auth(&reference); let mut client = Client::new(ClientConfig { - protocol: ClientProtocol::HttpsExcept(config.insecure_registries.clone()), platform_resolver: Some(Box::new(linux_amd64_resolver)), ..Default::default() }); @@ -94,7 +96,7 @@ impl Container { let config_layer: DockerConfigLayer = serde_json::from_str(&config_layer_str).unwrap(); let image_layers = get_image_layers( - config.use_cache, + use_cached_files, &mut client, &reference, &manifest, @@ -248,6 +250,7 @@ async fn get_image_layers( client, reference, &layer.digest, + &config_layer.rootfs.diff_ids[layer_index].clone(), ) .await?, }); @@ -262,125 +265,160 @@ async fn get_image_layers( Ok(layers) } -pub fn get_verity_path(base_dir: &Path, file_name: &str) -> PathBuf { - let mut verity_path: PathBuf = base_dir.join(file_name); - verity_path.set_extension("verity"); - verity_path -} - -pub fn get_decompressed_path(verity_path: &Path) -> PathBuf { - let mut decompressed_path = verity_path.to_path_buf().clone(); - decompressed_path.set_extension("tar"); - decompressed_path -} - -pub fn get_compressed_path(decompressed_path: &Path) -> PathBuf { - let mut compressed_path = decompressed_path.to_path_buf().clone(); - compressed_path.set_extension("gz"); - compressed_path -} - -pub async fn delete_files(base_dir: &Path, file_name: &str) { - let verity_path = get_verity_path(base_dir, file_name); - let _ = fs::remove_file(&verity_path).await; - - let decompressed_path = get_decompressed_path(&verity_path); - let _ = fs::remove_file(&decompressed_path).await; - - let compressed_path = get_compressed_path(&decompressed_path); - let _ = fs::remove_file(&compressed_path).await; -} - async fn get_verity_hash( use_cached_files: bool, client: &mut Client, reference: &Reference, layer_digest: &str, + diff_id: &str, ) -> Result { + let temp_dir = tempfile::tempdir_in(".")?; + let base_dir = temp_dir.path(); + let cache_file = "layers-cache.json"; // Use file names supported by both Linux and Windows. let file_name = str::replace(layer_digest, ":", "-"); + let mut decompressed_path = base_dir.join(file_name); + decompressed_path.set_extension("tar"); - let base_dir = std::path::Path::new("layers_cache"); - let verity_path = get_verity_path(base_dir, &file_name); + let mut compressed_path = decompressed_path.clone(); + compressed_path.set_extension("gz"); - if use_cached_files && verity_path.exists() { - info!("Using cached file {:?}", &verity_path); - } else if let Err(e) = create_verity_hash_file( - use_cached_files, - client, - reference, - layer_digest, - base_dir, - &get_decompressed_path(&verity_path), - ) - .await - { - delete_files(base_dir, &file_name).await; - bail!("{e}"); + let mut verity_hash = "".to_string(); + let mut error_message = "".to_string(); + let mut error = false; + + // get value from store and return if it exists + if use_cached_files { + verity_hash = read_verity_from_store(cache_file, diff_id)?; + info!("Using cache file"); + info!("dm-verity root hash: {verity_hash}"); } - match std::fs::read_to_string(&verity_path) { - Err(e) => { - delete_files(base_dir, &file_name).await; - bail!("Failed to read {:?}, error {e}", &verity_path); - } - Ok(v) => { - if !use_cached_files { - let _ = std::fs::remove_dir_all(base_dir); - } - info!("dm-verity root hash: {v}"); - Ok(v) - } - } -} - -async fn create_verity_hash_file( - use_cached_files: bool, - client: &mut Client, - reference: &Reference, - layer_digest: &str, - base_dir: &Path, - decompressed_path: &PathBuf, -) -> Result<()> { - if use_cached_files && decompressed_path.exists() { - info!("Using cached file {:?}", &decompressed_path); - } else { - std::fs::create_dir_all(base_dir)?; - create_decompressed_layer_file( - use_cached_files, + // create the layer files + if verity_hash.is_empty() { + if let Err(e) = create_decompressed_layer_file( client, reference, layer_digest, - decompressed_path, + &decompressed_path, + &compressed_path, ) - .await?; + .await + { + error_message = format!("Failed to create verity hash for {layer_digest}, error {e}"); + error = true + }; + + if !error { + match get_verity_hash_value(&decompressed_path) { + Err(e) => { + error_message = format!("Failed to get verity hash {e}"); + error = true; + } + Ok(v) => { + verity_hash = v; + if use_cached_files { + add_verity_to_store(cache_file, diff_id, &verity_hash)?; + } + info!("dm-verity root hash: {verity_hash}"); + } + } + } } - do_create_verity_hash_file(decompressed_path) + temp_dir.close()?; + if error { + // remove the cache file if we're using it + if use_cached_files { + std::fs::remove_file(cache_file)?; + } + warn!("{error_message}"); + } + Ok(verity_hash) +} + +// the store is a json file that matches layer hashes to verity hashes +pub fn add_verity_to_store(cache_file: &str, diff_id: &str, verity_hash: &str) -> Result<()> { + // open the json file in read mode, create it if it doesn't exist + let read_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(cache_file)?; + + let mut data: Vec = if let Ok(vec) = serde_json::from_reader(read_file) { + vec + } else { + // Delete the malformed file here if it's present + Vec::new() + }; + + // Add new data to the deserialized JSON + data.push(ImageLayer { + diff_id: diff_id.to_string(), + verity_hash: verity_hash.to_string(), + }); + + // Serialize in pretty format + let serialized = serde_json::to_string_pretty(&data)?; + + // Open the JSON file to write + let file = OpenOptions::new().write(true).open(cache_file)?; + + // try to lock the file, if it fails, get the error + let result = file.try_lock_exclusive(); + if result.is_err() { + warn!("Waiting to lock file: {cache_file}"); + file.lock_exclusive()?; + } + // Write the serialized JSON to the file + let mut writer = BufWriter::new(&file); + writeln!(writer, "{}", serialized)?; + writer.flush()?; + file.unlock()?; + Ok(()) +} + +// helper function to read the verity hash from the store +// returns empty string if not found or file does not exist +pub fn read_verity_from_store(cache_file: &str, diff_id: &str) -> Result { + match OpenOptions::new().read(true).open(cache_file) { + Ok(file) => match serde_json::from_reader(file) { + Result::, _>::Ok(layers) => { + for layer in layers { + if layer.diff_id == diff_id { + return Ok(layer.verity_hash); + } + } + } + Err(e) => { + warn!("read_verity_from_store: failed to read cached image layers: {e}"); + } + }, + Err(e) => { + info!("read_verity_from_store: failed to open cache file: {e}"); + } + } + + Ok(String::new()) } async fn create_decompressed_layer_file( - use_cached_files: bool, client: &mut Client, reference: &Reference, layer_digest: &str, - decompressed_path: &PathBuf, + decompressed_path: &Path, + compressed_path: &Path, ) -> Result<()> { - let compressed_path = get_compressed_path(decompressed_path); - - if use_cached_files && compressed_path.exists() { - info!("Using cached file {:?}", &compressed_path); - } else { - info!("Pulling layer {layer_digest}"); - let mut file = tokio::fs::File::create(&compressed_path) - .await - .map_err(|e| anyhow!(e))?; - client - .pull_blob(reference, layer_digest, &mut file) - .await - .map_err(|e| anyhow!(e))?; - file.flush().await.map_err(|e| anyhow!(e))?; - } + info!("Pulling layer {:?}", layer_digest); + let mut file = tokio::fs::File::create(&compressed_path) + .await + .map_err(|e| anyhow!(e))?; + client + .pull_blob(reference, layer_digest, &mut file) + .await + .map_err(|e| anyhow!(e))?; + file.flush().await.map_err(|e| anyhow!(e))?; info!("Decompressing layer"); let compressed_file = std::fs::File::open(compressed_path).map_err(|e| anyhow!(e))?; @@ -401,15 +439,12 @@ async fn create_decompressed_layer_file( Ok(()) } -pub fn do_create_verity_hash_file(decompressed_path: &PathBuf) -> Result<()> { +pub fn get_verity_hash_value(path: &Path) -> Result { info!("Calculating dm-verity root hash"); - let mut file = std::fs::File::open(decompressed_path)?; + let mut file = std::fs::File::open(path)?; let size = file.seek(std::io::SeekFrom::End(0))?; if size < 4096 { - return Err(anyhow!( - "Block device {:?} is too small: {size}", - &decompressed_path - )); + return Err(anyhow!("Block device {:?} is too small: {size}", &path)); } let salt = [0u8; ::OutputSize::USIZE]; @@ -417,21 +452,14 @@ pub fn do_create_verity_hash_file(decompressed_path: &PathBuf) -> Result<()> { let hash = verity::traverse_file(&mut file, 0, false, v, &mut verity::no_write)?; let result = format!("{:x}", hash); - let mut verity_path = decompressed_path.clone(); - verity_path.set_extension("verity"); - let mut verity_file = std::fs::File::create(verity_path).map_err(|e| anyhow!(e))?; - verity_file - .write_all(result.as_bytes()) - .map_err(|e| anyhow!(e))?; - verity_file.flush().map_err(|e| anyhow!(e))?; - - Ok(()) + Ok(result) } + pub async fn get_container(config: &Config, image: &str) -> Result { if let Some(socket_path) = &config.containerd_socket_path { return Container::new_containerd_pull(config.use_cache, image, socket_path).await; } - Container::new(config, image).await + Container::new(config.use_cache, image).await } fn build_auth(reference: &Reference) -> RegistryAuth { diff --git a/src/tools/genpolicy/src/registry_containerd.rs b/src/tools/genpolicy/src/registry_containerd.rs index 8d084dc258..c63a73f3d9 100644 --- a/src/tools/genpolicy/src/registry_containerd.rs +++ b/src/tools/genpolicy/src/registry_containerd.rs @@ -6,23 +6,22 @@ // Allow Docker image config field names. #![allow(non_snake_case)] use crate::registry::{ - delete_files, do_create_verity_hash_file, get_compressed_path, get_decompressed_path, - get_verity_path, Container, DockerConfigLayer, ImageLayer, + add_verity_to_store, get_verity_hash_value, read_verity_from_store, Container, + DockerConfigLayer, ImageLayer, }; -use anyhow::{anyhow, bail, Result}; -use containerd_client::services::v1::GetImageRequest; -use containerd_client::with_namespace; + +use anyhow::{anyhow, Result}; +use containerd_client::{services::v1::GetImageRequest, with_namespace}; use docker_credential::{CredentialRetrievalError, DockerCredential}; use k8s_cri::v1::{image_service_client::ImageServiceClient, AuthConfig}; -use log::warn; -use log::{debug, info}; +use log::{debug, info, warn}; use oci_distribution::Reference; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::{io::Seek, io::Write, path::Path, path::PathBuf}; -use tokio::io; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; -use tokio::net::UnixStream; +use std::{collections::HashMap, convert::TryFrom, io::Seek, io::Write, path::Path}; +use tokio::{ + io, + io::{AsyncSeekExt, AsyncWriteExt}, + net::UnixStream, +}; use tonic::transport::{Endpoint, Uri}; use tonic::Request; use tower::service_fn; @@ -263,8 +262,9 @@ pub async fn get_image_layers( diff_id: config_layer.rootfs.diff_ids[layer_index].clone(), verity_hash: get_verity_hash( use_cached_files, - client, layer["digest"].as_str().unwrap(), + client, + &config_layer.rootfs.diff_ids[layer_index].clone(), ) .await?, }; @@ -281,104 +281,107 @@ pub async fn get_image_layers( async fn get_verity_hash( use_cached_files: bool, - client: &containerd_client::Client, layer_digest: &str, + client: &containerd_client::Client, + diff_id: &str, ) -> Result { + let temp_dir = tempfile::tempdir_in(".")?; + let base_dir = temp_dir.path(); + let cache_file = "layers-cache.json"; // Use file names supported by both Linux and Windows. let file_name = str::replace(layer_digest, ":", "-"); + let mut decompressed_path = base_dir.join(file_name); + decompressed_path.set_extension("tar"); - let base_dir = std::path::Path::new("layers_cache"); - let verity_path = get_verity_path(base_dir, &file_name); + let mut compressed_path = decompressed_path.clone(); + compressed_path.set_extension("gz"); - if use_cached_files && verity_path.exists() { + let mut verity_hash = "".to_string(); + let mut error_message = "".to_string(); + let mut error = false; + + if use_cached_files { + verity_hash = read_verity_from_store(cache_file, diff_id)?; info!("Using cache file"); - } else if let Err(e) = create_verity_hash_file( - use_cached_files, - layer_digest, - base_dir, - &get_decompressed_path(&verity_path), - client, - ) - .await - { - delete_files(base_dir, &file_name).await; - bail!("{e}"); + info!("dm-verity root hash: {verity_hash}"); } - match std::fs::read_to_string(&verity_path) { - Err(e) => { - delete_files(base_dir, &file_name).await; - bail!("Failed to read {:?}, error {e}", &verity_path); + if verity_hash.is_empty() { + // go find verity hash if not found in cache + if let Err(e) = create_decompressed_layer_file( + client, + layer_digest, + &decompressed_path, + &compressed_path, + ) + .await + { + error = true; + error_message = format!("Failed to create verity hash for {layer_digest}, error {e}"); } - Ok(v) => { - if !use_cached_files { - let _ = std::fs::remove_dir_all(base_dir); + + if !error { + match get_verity_hash_value(&decompressed_path) { + Err(e) => { + error_message = format!("Failed to get verity hash {e}"); + error = true; + } + Ok(v) => { + verity_hash = v; + if use_cached_files { + add_verity_to_store(cache_file, diff_id, &verity_hash)?; + } + info!("dm-verity root hash: {verity_hash}"); + } } - info!("dm-verity root hash: {v}"); - Ok(v) } } -} - -async fn create_verity_hash_file( - use_cached_files: bool, - layer_digest: &str, - base_dir: &Path, - decompressed_path: &PathBuf, - client: &containerd_client::Client, -) -> Result<()> { - if use_cached_files && decompressed_path.exists() { - info!("Using cached file {:?}", &decompressed_path); - } else { - std::fs::create_dir_all(base_dir)?; - create_decompressed_layer_file(use_cached_files, layer_digest, decompressed_path, client) - .await?; + temp_dir.close()?; + if error { + // remove the cache file if we're using it + if use_cached_files { + std::fs::remove_file(cache_file)?; + } + warn!("{error_message}"); } - - do_create_verity_hash_file(decompressed_path) + Ok(verity_hash) } async fn create_decompressed_layer_file( - use_cached_files: bool, - layer_digest: &str, - decompressed_path: &PathBuf, client: &containerd_client::Client, + layer_digest: &str, + decompressed_path: &Path, + compressed_path: &Path, ) -> Result<()> { - let compressed_path = get_compressed_path(decompressed_path); + info!("Pulling layer {layer_digest}"); + let mut file = tokio::fs::File::create(&compressed_path) + .await + .map_err(|e| anyhow!(e))?; - if use_cached_files && compressed_path.exists() { - info!("Using cached file {:?}", &compressed_path); - } else { - info!("Pulling layer {layer_digest}"); - let mut file = tokio::fs::File::create(&compressed_path) - .await - .map_err(|e| anyhow!(e))?; + info!("Decompressing layer"); - info!("Decompressing layer"); + let req = containerd_client::services::v1::ReadContentRequest { + digest: layer_digest.to_string(), + offset: 0, + size: 0, + }; + let req = with_namespace!(req, "k8s.io"); + let mut c = client.content(); + let resp = c.read(req).await?; + let mut stream = resp.into_inner(); - let req = containerd_client::services::v1::ReadContentRequest { - digest: layer_digest.to_string(), - offset: 0, - size: 0, - }; - let req = with_namespace!(req, "k8s.io"); - let mut c = client.content(); - let resp = c.read(req).await?; - let mut stream = resp.into_inner(); - - while let Some(chunk) = stream.message().await? { - if chunk.offset < 0 { - print!("oop") - } - file.seek(io::SeekFrom::Start(chunk.offset as u64)).await?; - file.write_all(&chunk.data).await?; + while let Some(chunk) = stream.message().await? { + if chunk.offset < 0 { + return Err(anyhow!("Too many Docker gzip layers")); } - - file.flush() - .await - .map_err(|e| anyhow!(e)) - .expect("Failed to flush file"); + file.seek(io::SeekFrom::Start(chunk.offset as u64)).await?; + file.write_all(&chunk.data).await?; } + + file.flush() + .await + .map_err(|e| anyhow!(e)) + .expect("Failed to flush file"); let compressed_file = std::fs::File::open(compressed_path).map_err(|e| anyhow!(e))?; let mut decompressed_file = std::fs::OpenOptions::new() .read(true)