Merge pull request #9530 from microsoft/saulparedes/improve_caching

genpolicy: changing caching so the tool can run concurrently with itself
This commit is contained in:
Dan Mihai 2024-04-25 13:06:23 -07:00 committed by GitHub
commit b42ddaf15f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 254 additions and 211 deletions

View File

@ -536,6 +536,16 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
@ -619,6 +629,7 @@ dependencies = [
"docker_credential",
"env_logger",
"flate2",
"fs2",
"generic-array",
"k8s-cri",
"log",

View File

@ -59,6 +59,7 @@ sha2 = "0.10.6"
tarindex = { git = "https://github.com/kata-containers/tardev-snapshotter", rev = "06183a5" }
tempfile = "3.5.0"
zerocopy = "0.6.1"
fs2 = "0.4.3"
# containerd image pull support
k8s-cri = "0.7.0"

View File

@ -8,20 +8,23 @@
use crate::containerd;
use crate::policy;
use crate::utils::Config;
use crate::verity;
use crate::utils::Config;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, Result};
use docker_credential::{CredentialRetrievalError, DockerCredential};
use log::warn;
use log::{debug, info, LevelFilter};
use oci_distribution::client::{linux_amd64_resolver, ClientConfig, ClientProtocol};
use oci_distribution::{manifest, secrets::RegistryAuth, Client, Reference};
use fs2::FileExt;
use log::{debug, info, warn, LevelFilter};
use oci_distribution::{
client::{linux_amd64_resolver, ClientConfig},
manifest,
secrets::RegistryAuth,
Client, Reference,
};
use serde::{Deserialize, Serialize};
use sha2::{digest::typenum::Unsigned, digest::OutputSizeUser, Sha256};
use std::io::{self, Seek, Write};
use std::path::{Path, PathBuf};
use tokio::{fs, io::AsyncWriteExt};
use std::{fs::OpenOptions, io, io::BufWriter, io::Seek, io::Write, path::Path};
use tokio::io::AsyncWriteExt;
/// Container image properties obtained from an OCI repository.
#[derive(Clone, Debug, Default)]
@ -57,21 +60,20 @@ pub struct DockerRootfs {
}
/// This application's image layer properties.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ImageLayer {
pub diff_id: String,
pub verity_hash: String,
}
impl Container {
pub async fn new(config: &Config, image: &str) -> Result<Self> {
pub async fn new(use_cached_files: bool, image: &str) -> Result<Self> {
info!("============================================");
info!("Pulling manifest and config for {:?}", image);
let reference: Reference = image.to_string().parse().unwrap();
let auth = build_auth(&reference);
let mut client = Client::new(ClientConfig {
protocol: ClientProtocol::HttpsExcept(config.insecure_registries.clone()),
platform_resolver: Some(Box::new(linux_amd64_resolver)),
..Default::default()
});
@ -94,7 +96,7 @@ impl Container {
let config_layer: DockerConfigLayer =
serde_json::from_str(&config_layer_str).unwrap();
let image_layers = get_image_layers(
config.use_cache,
use_cached_files,
&mut client,
&reference,
&manifest,
@ -248,6 +250,7 @@ async fn get_image_layers(
client,
reference,
&layer.digest,
&config_layer.rootfs.diff_ids[layer_index].clone(),
)
.await?,
});
@ -262,125 +265,160 @@ async fn get_image_layers(
Ok(layers)
}
pub fn get_verity_path(base_dir: &Path, file_name: &str) -> PathBuf {
let mut verity_path: PathBuf = base_dir.join(file_name);
verity_path.set_extension("verity");
verity_path
}
pub fn get_decompressed_path(verity_path: &Path) -> PathBuf {
let mut decompressed_path = verity_path.to_path_buf().clone();
decompressed_path.set_extension("tar");
decompressed_path
}
pub fn get_compressed_path(decompressed_path: &Path) -> PathBuf {
let mut compressed_path = decompressed_path.to_path_buf().clone();
compressed_path.set_extension("gz");
compressed_path
}
pub async fn delete_files(base_dir: &Path, file_name: &str) {
let verity_path = get_verity_path(base_dir, file_name);
let _ = fs::remove_file(&verity_path).await;
let decompressed_path = get_decompressed_path(&verity_path);
let _ = fs::remove_file(&decompressed_path).await;
let compressed_path = get_compressed_path(&decompressed_path);
let _ = fs::remove_file(&compressed_path).await;
}
async fn get_verity_hash(
use_cached_files: bool,
client: &mut Client,
reference: &Reference,
layer_digest: &str,
diff_id: &str,
) -> Result<String> {
let temp_dir = tempfile::tempdir_in(".")?;
let base_dir = temp_dir.path();
let cache_file = "layers-cache.json";
// Use file names supported by both Linux and Windows.
let file_name = str::replace(layer_digest, ":", "-");
let mut decompressed_path = base_dir.join(file_name);
decompressed_path.set_extension("tar");
let base_dir = std::path::Path::new("layers_cache");
let verity_path = get_verity_path(base_dir, &file_name);
let mut compressed_path = decompressed_path.clone();
compressed_path.set_extension("gz");
if use_cached_files && verity_path.exists() {
info!("Using cached file {:?}", &verity_path);
} else if let Err(e) = create_verity_hash_file(
use_cached_files,
client,
reference,
layer_digest,
base_dir,
&get_decompressed_path(&verity_path),
)
.await
{
delete_files(base_dir, &file_name).await;
bail!("{e}");
let mut verity_hash = "".to_string();
let mut error_message = "".to_string();
let mut error = false;
// get value from store and return if it exists
if use_cached_files {
verity_hash = read_verity_from_store(cache_file, diff_id)?;
info!("Using cache file");
info!("dm-verity root hash: {verity_hash}");
}
match std::fs::read_to_string(&verity_path) {
Err(e) => {
delete_files(base_dir, &file_name).await;
bail!("Failed to read {:?}, error {e}", &verity_path);
}
Ok(v) => {
if !use_cached_files {
let _ = std::fs::remove_dir_all(base_dir);
}
info!("dm-verity root hash: {v}");
Ok(v)
}
}
}
async fn create_verity_hash_file(
use_cached_files: bool,
client: &mut Client,
reference: &Reference,
layer_digest: &str,
base_dir: &Path,
decompressed_path: &PathBuf,
) -> Result<()> {
if use_cached_files && decompressed_path.exists() {
info!("Using cached file {:?}", &decompressed_path);
} else {
std::fs::create_dir_all(base_dir)?;
create_decompressed_layer_file(
use_cached_files,
// create the layer files
if verity_hash.is_empty() {
if let Err(e) = create_decompressed_layer_file(
client,
reference,
layer_digest,
decompressed_path,
&decompressed_path,
&compressed_path,
)
.await?;
.await
{
error_message = format!("Failed to create verity hash for {layer_digest}, error {e}");
error = true
};
if !error {
match get_verity_hash_value(&decompressed_path) {
Err(e) => {
error_message = format!("Failed to get verity hash {e}");
error = true;
}
Ok(v) => {
verity_hash = v;
if use_cached_files {
add_verity_to_store(cache_file, diff_id, &verity_hash)?;
}
info!("dm-verity root hash: {verity_hash}");
}
}
}
}
do_create_verity_hash_file(decompressed_path)
temp_dir.close()?;
if error {
// remove the cache file if we're using it
if use_cached_files {
std::fs::remove_file(cache_file)?;
}
warn!("{error_message}");
}
Ok(verity_hash)
}
// the store is a json file that matches layer hashes to verity hashes
pub fn add_verity_to_store(cache_file: &str, diff_id: &str, verity_hash: &str) -> Result<()> {
// open the json file in read mode, create it if it doesn't exist
let read_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(cache_file)?;
let mut data: Vec<ImageLayer> = if let Ok(vec) = serde_json::from_reader(read_file) {
vec
} else {
// Delete the malformed file here if it's present
Vec::new()
};
// Add new data to the deserialized JSON
data.push(ImageLayer {
diff_id: diff_id.to_string(),
verity_hash: verity_hash.to_string(),
});
// Serialize in pretty format
let serialized = serde_json::to_string_pretty(&data)?;
// Open the JSON file to write
let file = OpenOptions::new().write(true).open(cache_file)?;
// try to lock the file, if it fails, get the error
let result = file.try_lock_exclusive();
if result.is_err() {
warn!("Waiting to lock file: {cache_file}");
file.lock_exclusive()?;
}
// Write the serialized JSON to the file
let mut writer = BufWriter::new(&file);
writeln!(writer, "{}", serialized)?;
writer.flush()?;
file.unlock()?;
Ok(())
}
// helper function to read the verity hash from the store
// returns empty string if not found or file does not exist
pub fn read_verity_from_store(cache_file: &str, diff_id: &str) -> Result<String> {
match OpenOptions::new().read(true).open(cache_file) {
Ok(file) => match serde_json::from_reader(file) {
Result::<Vec<ImageLayer>, _>::Ok(layers) => {
for layer in layers {
if layer.diff_id == diff_id {
return Ok(layer.verity_hash);
}
}
}
Err(e) => {
warn!("read_verity_from_store: failed to read cached image layers: {e}");
}
},
Err(e) => {
info!("read_verity_from_store: failed to open cache file: {e}");
}
}
Ok(String::new())
}
async fn create_decompressed_layer_file(
use_cached_files: bool,
client: &mut Client,
reference: &Reference,
layer_digest: &str,
decompressed_path: &PathBuf,
decompressed_path: &Path,
compressed_path: &Path,
) -> Result<()> {
let compressed_path = get_compressed_path(decompressed_path);
if use_cached_files && compressed_path.exists() {
info!("Using cached file {:?}", &compressed_path);
} else {
info!("Pulling layer {layer_digest}");
let mut file = tokio::fs::File::create(&compressed_path)
.await
.map_err(|e| anyhow!(e))?;
client
.pull_blob(reference, layer_digest, &mut file)
.await
.map_err(|e| anyhow!(e))?;
file.flush().await.map_err(|e| anyhow!(e))?;
}
info!("Pulling layer {:?}", layer_digest);
let mut file = tokio::fs::File::create(&compressed_path)
.await
.map_err(|e| anyhow!(e))?;
client
.pull_blob(reference, layer_digest, &mut file)
.await
.map_err(|e| anyhow!(e))?;
file.flush().await.map_err(|e| anyhow!(e))?;
info!("Decompressing layer");
let compressed_file = std::fs::File::open(compressed_path).map_err(|e| anyhow!(e))?;
@ -401,15 +439,12 @@ async fn create_decompressed_layer_file(
Ok(())
}
pub fn do_create_verity_hash_file(decompressed_path: &PathBuf) -> Result<()> {
pub fn get_verity_hash_value(path: &Path) -> Result<String> {
info!("Calculating dm-verity root hash");
let mut file = std::fs::File::open(decompressed_path)?;
let mut file = std::fs::File::open(path)?;
let size = file.seek(std::io::SeekFrom::End(0))?;
if size < 4096 {
return Err(anyhow!(
"Block device {:?} is too small: {size}",
&decompressed_path
));
return Err(anyhow!("Block device {:?} is too small: {size}", &path));
}
let salt = [0u8; <Sha256 as OutputSizeUser>::OutputSize::USIZE];
@ -417,21 +452,14 @@ pub fn do_create_verity_hash_file(decompressed_path: &PathBuf) -> Result<()> {
let hash = verity::traverse_file(&mut file, 0, false, v, &mut verity::no_write)?;
let result = format!("{:x}", hash);
let mut verity_path = decompressed_path.clone();
verity_path.set_extension("verity");
let mut verity_file = std::fs::File::create(verity_path).map_err(|e| anyhow!(e))?;
verity_file
.write_all(result.as_bytes())
.map_err(|e| anyhow!(e))?;
verity_file.flush().map_err(|e| anyhow!(e))?;
Ok(())
Ok(result)
}
pub async fn get_container(config: &Config, image: &str) -> Result<Container> {
if let Some(socket_path) = &config.containerd_socket_path {
return Container::new_containerd_pull(config.use_cache, image, socket_path).await;
}
Container::new(config, image).await
Container::new(config.use_cache, image).await
}
fn build_auth(reference: &Reference) -> RegistryAuth {

View File

@ -6,23 +6,22 @@
// Allow Docker image config field names.
#![allow(non_snake_case)]
use crate::registry::{
delete_files, do_create_verity_hash_file, get_compressed_path, get_decompressed_path,
get_verity_path, Container, DockerConfigLayer, ImageLayer,
add_verity_to_store, get_verity_hash_value, read_verity_from_store, Container,
DockerConfigLayer, ImageLayer,
};
use anyhow::{anyhow, bail, Result};
use containerd_client::services::v1::GetImageRequest;
use containerd_client::with_namespace;
use anyhow::{anyhow, Result};
use containerd_client::{services::v1::GetImageRequest, with_namespace};
use docker_credential::{CredentialRetrievalError, DockerCredential};
use k8s_cri::v1::{image_service_client::ImageServiceClient, AuthConfig};
use log::warn;
use log::{debug, info};
use log::{debug, info, warn};
use oci_distribution::Reference;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::{io::Seek, io::Write, path::Path, path::PathBuf};
use tokio::io;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::net::UnixStream;
use std::{collections::HashMap, convert::TryFrom, io::Seek, io::Write, path::Path};
use tokio::{
io,
io::{AsyncSeekExt, AsyncWriteExt},
net::UnixStream,
};
use tonic::transport::{Endpoint, Uri};
use tonic::Request;
use tower::service_fn;
@ -263,8 +262,9 @@ pub async fn get_image_layers(
diff_id: config_layer.rootfs.diff_ids[layer_index].clone(),
verity_hash: get_verity_hash(
use_cached_files,
client,
layer["digest"].as_str().unwrap(),
client,
&config_layer.rootfs.diff_ids[layer_index].clone(),
)
.await?,
};
@ -281,104 +281,107 @@ pub async fn get_image_layers(
async fn get_verity_hash(
use_cached_files: bool,
client: &containerd_client::Client,
layer_digest: &str,
client: &containerd_client::Client,
diff_id: &str,
) -> Result<String> {
let temp_dir = tempfile::tempdir_in(".")?;
let base_dir = temp_dir.path();
let cache_file = "layers-cache.json";
// Use file names supported by both Linux and Windows.
let file_name = str::replace(layer_digest, ":", "-");
let mut decompressed_path = base_dir.join(file_name);
decompressed_path.set_extension("tar");
let base_dir = std::path::Path::new("layers_cache");
let verity_path = get_verity_path(base_dir, &file_name);
let mut compressed_path = decompressed_path.clone();
compressed_path.set_extension("gz");
if use_cached_files && verity_path.exists() {
let mut verity_hash = "".to_string();
let mut error_message = "".to_string();
let mut error = false;
if use_cached_files {
verity_hash = read_verity_from_store(cache_file, diff_id)?;
info!("Using cache file");
} else if let Err(e) = create_verity_hash_file(
use_cached_files,
layer_digest,
base_dir,
&get_decompressed_path(&verity_path),
client,
)
.await
{
delete_files(base_dir, &file_name).await;
bail!("{e}");
info!("dm-verity root hash: {verity_hash}");
}
match std::fs::read_to_string(&verity_path) {
Err(e) => {
delete_files(base_dir, &file_name).await;
bail!("Failed to read {:?}, error {e}", &verity_path);
if verity_hash.is_empty() {
// go find verity hash if not found in cache
if let Err(e) = create_decompressed_layer_file(
client,
layer_digest,
&decompressed_path,
&compressed_path,
)
.await
{
error = true;
error_message = format!("Failed to create verity hash for {layer_digest}, error {e}");
}
Ok(v) => {
if !use_cached_files {
let _ = std::fs::remove_dir_all(base_dir);
if !error {
match get_verity_hash_value(&decompressed_path) {
Err(e) => {
error_message = format!("Failed to get verity hash {e}");
error = true;
}
Ok(v) => {
verity_hash = v;
if use_cached_files {
add_verity_to_store(cache_file, diff_id, &verity_hash)?;
}
info!("dm-verity root hash: {verity_hash}");
}
}
info!("dm-verity root hash: {v}");
Ok(v)
}
}
}
async fn create_verity_hash_file(
use_cached_files: bool,
layer_digest: &str,
base_dir: &Path,
decompressed_path: &PathBuf,
client: &containerd_client::Client,
) -> Result<()> {
if use_cached_files && decompressed_path.exists() {
info!("Using cached file {:?}", &decompressed_path);
} else {
std::fs::create_dir_all(base_dir)?;
create_decompressed_layer_file(use_cached_files, layer_digest, decompressed_path, client)
.await?;
temp_dir.close()?;
if error {
// remove the cache file if we're using it
if use_cached_files {
std::fs::remove_file(cache_file)?;
}
warn!("{error_message}");
}
do_create_verity_hash_file(decompressed_path)
Ok(verity_hash)
}
async fn create_decompressed_layer_file(
use_cached_files: bool,
layer_digest: &str,
decompressed_path: &PathBuf,
client: &containerd_client::Client,
layer_digest: &str,
decompressed_path: &Path,
compressed_path: &Path,
) -> Result<()> {
let compressed_path = get_compressed_path(decompressed_path);
info!("Pulling layer {layer_digest}");
let mut file = tokio::fs::File::create(&compressed_path)
.await
.map_err(|e| anyhow!(e))?;
if use_cached_files && compressed_path.exists() {
info!("Using cached file {:?}", &compressed_path);
} else {
info!("Pulling layer {layer_digest}");
let mut file = tokio::fs::File::create(&compressed_path)
.await
.map_err(|e| anyhow!(e))?;
info!("Decompressing layer");
info!("Decompressing layer");
let req = containerd_client::services::v1::ReadContentRequest {
digest: layer_digest.to_string(),
offset: 0,
size: 0,
};
let req = with_namespace!(req, "k8s.io");
let mut c = client.content();
let resp = c.read(req).await?;
let mut stream = resp.into_inner();
let req = containerd_client::services::v1::ReadContentRequest {
digest: layer_digest.to_string(),
offset: 0,
size: 0,
};
let req = with_namespace!(req, "k8s.io");
let mut c = client.content();
let resp = c.read(req).await?;
let mut stream = resp.into_inner();
while let Some(chunk) = stream.message().await? {
if chunk.offset < 0 {
print!("oop")
}
file.seek(io::SeekFrom::Start(chunk.offset as u64)).await?;
file.write_all(&chunk.data).await?;
while let Some(chunk) = stream.message().await? {
if chunk.offset < 0 {
return Err(anyhow!("Too many Docker gzip layers"));
}
file.flush()
.await
.map_err(|e| anyhow!(e))
.expect("Failed to flush file");
file.seek(io::SeekFrom::Start(chunk.offset as u64)).await?;
file.write_all(&chunk.data).await?;
}
file.flush()
.await
.map_err(|e| anyhow!(e))
.expect("Failed to flush file");
let compressed_file = std::fs::File::open(compressed_path).map_err(|e| anyhow!(e))?;
let mut decompressed_file = std::fs::OpenOptions::new()
.read(true)