mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-30 14:25:43 +00:00
genpolicy: changing caching so the tool can run
concurrently with itself Based on 3a1461b0a5186a92afedaaea33ff2bd120d1cea0 Previously the tool would use the layers_cache folder for all instances and hence delete the cache when it was done, interfereing with other instances. This change makes it so that each instance of the tool will have its own temp folder to use. Signed-off-by: Saul Paredes <saulparedes@microsoft.com>
This commit is contained in:
parent
7e12d588c0
commit
2149cb6502
11
src/tools/genpolicy/Cargo.lock
generated
11
src/tools/genpolicy/Cargo.lock
generated
@ -536,6 +536,16 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs2"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.28"
|
||||
@ -619,6 +629,7 @@ dependencies = [
|
||||
"docker_credential",
|
||||
"env_logger",
|
||||
"flate2",
|
||||
"fs2",
|
||||
"generic-array",
|
||||
"k8s-cri",
|
||||
"log",
|
||||
|
@ -59,6 +59,7 @@ sha2 = "0.10.6"
|
||||
tarindex = { git = "https://github.com/kata-containers/tardev-snapshotter", rev = "06183a5" }
|
||||
tempfile = "3.5.0"
|
||||
zerocopy = "0.6.1"
|
||||
fs2 = "0.4.3"
|
||||
|
||||
# containerd image pull support
|
||||
k8s-cri = "0.7.0"
|
||||
|
@ -8,20 +8,23 @@
|
||||
|
||||
use crate::containerd;
|
||||
use crate::policy;
|
||||
use crate::utils::Config;
|
||||
use crate::verity;
|
||||
|
||||
use crate::utils::Config;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use anyhow::{anyhow, Result};
|
||||
use docker_credential::{CredentialRetrievalError, DockerCredential};
|
||||
use log::warn;
|
||||
use log::{debug, info, LevelFilter};
|
||||
use oci_distribution::client::{linux_amd64_resolver, ClientConfig, ClientProtocol};
|
||||
use oci_distribution::{manifest, secrets::RegistryAuth, Client, Reference};
|
||||
use fs2::FileExt;
|
||||
use log::{debug, info, warn, LevelFilter};
|
||||
use oci_distribution::{
|
||||
client::{linux_amd64_resolver, ClientConfig},
|
||||
manifest,
|
||||
secrets::RegistryAuth,
|
||||
Client, Reference,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{digest::typenum::Unsigned, digest::OutputSizeUser, Sha256};
|
||||
use std::io::{self, Seek, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::{fs, io::AsyncWriteExt};
|
||||
use std::{fs::OpenOptions, io, io::BufWriter, io::Seek, io::Write, path::Path};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
/// Container image properties obtained from an OCI repository.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
@ -57,21 +60,20 @@ pub struct DockerRootfs {
|
||||
}
|
||||
|
||||
/// This application's image layer properties.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ImageLayer {
|
||||
pub diff_id: String,
|
||||
pub verity_hash: String,
|
||||
}
|
||||
|
||||
impl Container {
|
||||
pub async fn new(config: &Config, image: &str) -> Result<Self> {
|
||||
pub async fn new(use_cached_files: bool, image: &str) -> Result<Self> {
|
||||
info!("============================================");
|
||||
info!("Pulling manifest and config for {:?}", image);
|
||||
let reference: Reference = image.to_string().parse().unwrap();
|
||||
let auth = build_auth(&reference);
|
||||
|
||||
let mut client = Client::new(ClientConfig {
|
||||
protocol: ClientProtocol::HttpsExcept(config.insecure_registries.clone()),
|
||||
platform_resolver: Some(Box::new(linux_amd64_resolver)),
|
||||
..Default::default()
|
||||
});
|
||||
@ -94,7 +96,7 @@ impl Container {
|
||||
let config_layer: DockerConfigLayer =
|
||||
serde_json::from_str(&config_layer_str).unwrap();
|
||||
let image_layers = get_image_layers(
|
||||
config.use_cache,
|
||||
use_cached_files,
|
||||
&mut client,
|
||||
&reference,
|
||||
&manifest,
|
||||
@ -248,6 +250,7 @@ async fn get_image_layers(
|
||||
client,
|
||||
reference,
|
||||
&layer.digest,
|
||||
&config_layer.rootfs.diff_ids[layer_index].clone(),
|
||||
)
|
||||
.await?,
|
||||
});
|
||||
@ -262,125 +265,160 @@ async fn get_image_layers(
|
||||
Ok(layers)
|
||||
}
|
||||
|
||||
pub fn get_verity_path(base_dir: &Path, file_name: &str) -> PathBuf {
|
||||
let mut verity_path: PathBuf = base_dir.join(file_name);
|
||||
verity_path.set_extension("verity");
|
||||
verity_path
|
||||
}
|
||||
|
||||
pub fn get_decompressed_path(verity_path: &Path) -> PathBuf {
|
||||
let mut decompressed_path = verity_path.to_path_buf().clone();
|
||||
decompressed_path.set_extension("tar");
|
||||
decompressed_path
|
||||
}
|
||||
|
||||
pub fn get_compressed_path(decompressed_path: &Path) -> PathBuf {
|
||||
let mut compressed_path = decompressed_path.to_path_buf().clone();
|
||||
compressed_path.set_extension("gz");
|
||||
compressed_path
|
||||
}
|
||||
|
||||
pub async fn delete_files(base_dir: &Path, file_name: &str) {
|
||||
let verity_path = get_verity_path(base_dir, file_name);
|
||||
let _ = fs::remove_file(&verity_path).await;
|
||||
|
||||
let decompressed_path = get_decompressed_path(&verity_path);
|
||||
let _ = fs::remove_file(&decompressed_path).await;
|
||||
|
||||
let compressed_path = get_compressed_path(&decompressed_path);
|
||||
let _ = fs::remove_file(&compressed_path).await;
|
||||
}
|
||||
|
||||
async fn get_verity_hash(
|
||||
use_cached_files: bool,
|
||||
client: &mut Client,
|
||||
reference: &Reference,
|
||||
layer_digest: &str,
|
||||
diff_id: &str,
|
||||
) -> Result<String> {
|
||||
let temp_dir = tempfile::tempdir_in(".")?;
|
||||
let base_dir = temp_dir.path();
|
||||
let cache_file = "layers-cache.json";
|
||||
// Use file names supported by both Linux and Windows.
|
||||
let file_name = str::replace(layer_digest, ":", "-");
|
||||
let mut decompressed_path = base_dir.join(file_name);
|
||||
decompressed_path.set_extension("tar");
|
||||
|
||||
let base_dir = std::path::Path::new("layers_cache");
|
||||
let verity_path = get_verity_path(base_dir, &file_name);
|
||||
let mut compressed_path = decompressed_path.clone();
|
||||
compressed_path.set_extension("gz");
|
||||
|
||||
if use_cached_files && verity_path.exists() {
|
||||
info!("Using cached file {:?}", &verity_path);
|
||||
} else if let Err(e) = create_verity_hash_file(
|
||||
use_cached_files,
|
||||
client,
|
||||
reference,
|
||||
layer_digest,
|
||||
base_dir,
|
||||
&get_decompressed_path(&verity_path),
|
||||
)
|
||||
.await
|
||||
{
|
||||
delete_files(base_dir, &file_name).await;
|
||||
bail!("{e}");
|
||||
let mut verity_hash = "".to_string();
|
||||
let mut error_message = "".to_string();
|
||||
let mut error = false;
|
||||
|
||||
// get value from store and return if it exists
|
||||
if use_cached_files {
|
||||
verity_hash = read_verity_from_store(cache_file, diff_id)?;
|
||||
info!("Using cache file");
|
||||
info!("dm-verity root hash: {verity_hash}");
|
||||
}
|
||||
|
||||
match std::fs::read_to_string(&verity_path) {
|
||||
Err(e) => {
|
||||
delete_files(base_dir, &file_name).await;
|
||||
bail!("Failed to read {:?}, error {e}", &verity_path);
|
||||
}
|
||||
Ok(v) => {
|
||||
if !use_cached_files {
|
||||
let _ = std::fs::remove_dir_all(base_dir);
|
||||
}
|
||||
info!("dm-verity root hash: {v}");
|
||||
Ok(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_verity_hash_file(
|
||||
use_cached_files: bool,
|
||||
client: &mut Client,
|
||||
reference: &Reference,
|
||||
layer_digest: &str,
|
||||
base_dir: &Path,
|
||||
decompressed_path: &PathBuf,
|
||||
) -> Result<()> {
|
||||
if use_cached_files && decompressed_path.exists() {
|
||||
info!("Using cached file {:?}", &decompressed_path);
|
||||
} else {
|
||||
std::fs::create_dir_all(base_dir)?;
|
||||
create_decompressed_layer_file(
|
||||
use_cached_files,
|
||||
// create the layer files
|
||||
if verity_hash.is_empty() {
|
||||
if let Err(e) = create_decompressed_layer_file(
|
||||
client,
|
||||
reference,
|
||||
layer_digest,
|
||||
decompressed_path,
|
||||
&decompressed_path,
|
||||
&compressed_path,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
error_message = format!("Failed to create verity hash for {layer_digest}, error {e}");
|
||||
error = true
|
||||
};
|
||||
|
||||
if !error {
|
||||
match get_verity_hash_value(&decompressed_path) {
|
||||
Err(e) => {
|
||||
error_message = format!("Failed to get verity hash {e}");
|
||||
error = true;
|
||||
}
|
||||
Ok(v) => {
|
||||
verity_hash = v;
|
||||
if use_cached_files {
|
||||
add_verity_to_store(cache_file, diff_id, &verity_hash)?;
|
||||
}
|
||||
info!("dm-verity root hash: {verity_hash}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
do_create_verity_hash_file(decompressed_path)
|
||||
temp_dir.close()?;
|
||||
if error {
|
||||
// remove the cache file if we're using it
|
||||
if use_cached_files {
|
||||
std::fs::remove_file(cache_file)?;
|
||||
}
|
||||
warn!("{error_message}");
|
||||
}
|
||||
Ok(verity_hash)
|
||||
}
|
||||
|
||||
// the store is a json file that matches layer hashes to verity hashes
|
||||
pub fn add_verity_to_store(cache_file: &str, diff_id: &str, verity_hash: &str) -> Result<()> {
|
||||
// open the json file in read mode, create it if it doesn't exist
|
||||
let read_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(cache_file)?;
|
||||
|
||||
let mut data: Vec<ImageLayer> = if let Ok(vec) = serde_json::from_reader(read_file) {
|
||||
vec
|
||||
} else {
|
||||
// Delete the malformed file here if it's present
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// Add new data to the deserialized JSON
|
||||
data.push(ImageLayer {
|
||||
diff_id: diff_id.to_string(),
|
||||
verity_hash: verity_hash.to_string(),
|
||||
});
|
||||
|
||||
// Serialize in pretty format
|
||||
let serialized = serde_json::to_string_pretty(&data)?;
|
||||
|
||||
// Open the JSON file to write
|
||||
let file = OpenOptions::new().write(true).open(cache_file)?;
|
||||
|
||||
// try to lock the file, if it fails, get the error
|
||||
let result = file.try_lock_exclusive();
|
||||
if result.is_err() {
|
||||
warn!("Waiting to lock file: {cache_file}");
|
||||
file.lock_exclusive()?;
|
||||
}
|
||||
// Write the serialized JSON to the file
|
||||
let mut writer = BufWriter::new(&file);
|
||||
writeln!(writer, "{}", serialized)?;
|
||||
writer.flush()?;
|
||||
file.unlock()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// helper function to read the verity hash from the store
|
||||
// returns empty string if not found or file does not exist
|
||||
pub fn read_verity_from_store(cache_file: &str, diff_id: &str) -> Result<String> {
|
||||
match OpenOptions::new().read(true).open(cache_file) {
|
||||
Ok(file) => match serde_json::from_reader(file) {
|
||||
Result::<Vec<ImageLayer>, _>::Ok(layers) => {
|
||||
for layer in layers {
|
||||
if layer.diff_id == diff_id {
|
||||
return Ok(layer.verity_hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("read_verity_from_store: failed to read cached image layers: {e}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("read_verity_from_store: failed to open cache file: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(String::new())
|
||||
}
|
||||
|
||||
async fn create_decompressed_layer_file(
|
||||
use_cached_files: bool,
|
||||
client: &mut Client,
|
||||
reference: &Reference,
|
||||
layer_digest: &str,
|
||||
decompressed_path: &PathBuf,
|
||||
decompressed_path: &Path,
|
||||
compressed_path: &Path,
|
||||
) -> Result<()> {
|
||||
let compressed_path = get_compressed_path(decompressed_path);
|
||||
|
||||
if use_cached_files && compressed_path.exists() {
|
||||
info!("Using cached file {:?}", &compressed_path);
|
||||
} else {
|
||||
info!("Pulling layer {layer_digest}");
|
||||
let mut file = tokio::fs::File::create(&compressed_path)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
client
|
||||
.pull_blob(reference, layer_digest, &mut file)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
file.flush().await.map_err(|e| anyhow!(e))?;
|
||||
}
|
||||
info!("Pulling layer {:?}", layer_digest);
|
||||
let mut file = tokio::fs::File::create(&compressed_path)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
client
|
||||
.pull_blob(reference, layer_digest, &mut file)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
file.flush().await.map_err(|e| anyhow!(e))?;
|
||||
|
||||
info!("Decompressing layer");
|
||||
let compressed_file = std::fs::File::open(compressed_path).map_err(|e| anyhow!(e))?;
|
||||
@ -401,15 +439,12 @@ async fn create_decompressed_layer_file(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn do_create_verity_hash_file(decompressed_path: &PathBuf) -> Result<()> {
|
||||
pub fn get_verity_hash_value(path: &Path) -> Result<String> {
|
||||
info!("Calculating dm-verity root hash");
|
||||
let mut file = std::fs::File::open(decompressed_path)?;
|
||||
let mut file = std::fs::File::open(path)?;
|
||||
let size = file.seek(std::io::SeekFrom::End(0))?;
|
||||
if size < 4096 {
|
||||
return Err(anyhow!(
|
||||
"Block device {:?} is too small: {size}",
|
||||
&decompressed_path
|
||||
));
|
||||
return Err(anyhow!("Block device {:?} is too small: {size}", &path));
|
||||
}
|
||||
|
||||
let salt = [0u8; <Sha256 as OutputSizeUser>::OutputSize::USIZE];
|
||||
@ -417,21 +452,14 @@ pub fn do_create_verity_hash_file(decompressed_path: &PathBuf) -> Result<()> {
|
||||
let hash = verity::traverse_file(&mut file, 0, false, v, &mut verity::no_write)?;
|
||||
let result = format!("{:x}", hash);
|
||||
|
||||
let mut verity_path = decompressed_path.clone();
|
||||
verity_path.set_extension("verity");
|
||||
let mut verity_file = std::fs::File::create(verity_path).map_err(|e| anyhow!(e))?;
|
||||
verity_file
|
||||
.write_all(result.as_bytes())
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
verity_file.flush().map_err(|e| anyhow!(e))?;
|
||||
|
||||
Ok(())
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn get_container(config: &Config, image: &str) -> Result<Container> {
|
||||
if let Some(socket_path) = &config.containerd_socket_path {
|
||||
return Container::new_containerd_pull(config.use_cache, image, socket_path).await;
|
||||
}
|
||||
Container::new(config, image).await
|
||||
Container::new(config.use_cache, image).await
|
||||
}
|
||||
|
||||
fn build_auth(reference: &Reference) -> RegistryAuth {
|
||||
|
@ -6,23 +6,22 @@
|
||||
// Allow Docker image config field names.
|
||||
#![allow(non_snake_case)]
|
||||
use crate::registry::{
|
||||
delete_files, do_create_verity_hash_file, get_compressed_path, get_decompressed_path,
|
||||
get_verity_path, Container, DockerConfigLayer, ImageLayer,
|
||||
add_verity_to_store, get_verity_hash_value, read_verity_from_store, Container,
|
||||
DockerConfigLayer, ImageLayer,
|
||||
};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use containerd_client::services::v1::GetImageRequest;
|
||||
use containerd_client::with_namespace;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use containerd_client::{services::v1::GetImageRequest, with_namespace};
|
||||
use docker_credential::{CredentialRetrievalError, DockerCredential};
|
||||
use k8s_cri::v1::{image_service_client::ImageServiceClient, AuthConfig};
|
||||
use log::warn;
|
||||
use log::{debug, info};
|
||||
use log::{debug, info, warn};
|
||||
use oci_distribution::Reference;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::{io::Seek, io::Write, path::Path, path::PathBuf};
|
||||
use tokio::io;
|
||||
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
|
||||
use tokio::net::UnixStream;
|
||||
use std::{collections::HashMap, convert::TryFrom, io::Seek, io::Write, path::Path};
|
||||
use tokio::{
|
||||
io,
|
||||
io::{AsyncSeekExt, AsyncWriteExt},
|
||||
net::UnixStream,
|
||||
};
|
||||
use tonic::transport::{Endpoint, Uri};
|
||||
use tonic::Request;
|
||||
use tower::service_fn;
|
||||
@ -263,8 +262,9 @@ pub async fn get_image_layers(
|
||||
diff_id: config_layer.rootfs.diff_ids[layer_index].clone(),
|
||||
verity_hash: get_verity_hash(
|
||||
use_cached_files,
|
||||
client,
|
||||
layer["digest"].as_str().unwrap(),
|
||||
client,
|
||||
&config_layer.rootfs.diff_ids[layer_index].clone(),
|
||||
)
|
||||
.await?,
|
||||
};
|
||||
@ -281,104 +281,107 @@ pub async fn get_image_layers(
|
||||
|
||||
async fn get_verity_hash(
|
||||
use_cached_files: bool,
|
||||
client: &containerd_client::Client,
|
||||
layer_digest: &str,
|
||||
client: &containerd_client::Client,
|
||||
diff_id: &str,
|
||||
) -> Result<String> {
|
||||
let temp_dir = tempfile::tempdir_in(".")?;
|
||||
let base_dir = temp_dir.path();
|
||||
let cache_file = "layers-cache.json";
|
||||
// Use file names supported by both Linux and Windows.
|
||||
let file_name = str::replace(layer_digest, ":", "-");
|
||||
let mut decompressed_path = base_dir.join(file_name);
|
||||
decompressed_path.set_extension("tar");
|
||||
|
||||
let base_dir = std::path::Path::new("layers_cache");
|
||||
let verity_path = get_verity_path(base_dir, &file_name);
|
||||
let mut compressed_path = decompressed_path.clone();
|
||||
compressed_path.set_extension("gz");
|
||||
|
||||
if use_cached_files && verity_path.exists() {
|
||||
let mut verity_hash = "".to_string();
|
||||
let mut error_message = "".to_string();
|
||||
let mut error = false;
|
||||
|
||||
if use_cached_files {
|
||||
verity_hash = read_verity_from_store(cache_file, diff_id)?;
|
||||
info!("Using cache file");
|
||||
} else if let Err(e) = create_verity_hash_file(
|
||||
use_cached_files,
|
||||
layer_digest,
|
||||
base_dir,
|
||||
&get_decompressed_path(&verity_path),
|
||||
client,
|
||||
)
|
||||
.await
|
||||
{
|
||||
delete_files(base_dir, &file_name).await;
|
||||
bail!("{e}");
|
||||
info!("dm-verity root hash: {verity_hash}");
|
||||
}
|
||||
|
||||
match std::fs::read_to_string(&verity_path) {
|
||||
Err(e) => {
|
||||
delete_files(base_dir, &file_name).await;
|
||||
bail!("Failed to read {:?}, error {e}", &verity_path);
|
||||
if verity_hash.is_empty() {
|
||||
// go find verity hash if not found in cache
|
||||
if let Err(e) = create_decompressed_layer_file(
|
||||
client,
|
||||
layer_digest,
|
||||
&decompressed_path,
|
||||
&compressed_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error = true;
|
||||
error_message = format!("Failed to create verity hash for {layer_digest}, error {e}");
|
||||
}
|
||||
Ok(v) => {
|
||||
if !use_cached_files {
|
||||
let _ = std::fs::remove_dir_all(base_dir);
|
||||
|
||||
if !error {
|
||||
match get_verity_hash_value(&decompressed_path) {
|
||||
Err(e) => {
|
||||
error_message = format!("Failed to get verity hash {e}");
|
||||
error = true;
|
||||
}
|
||||
Ok(v) => {
|
||||
verity_hash = v;
|
||||
if use_cached_files {
|
||||
add_verity_to_store(cache_file, diff_id, &verity_hash)?;
|
||||
}
|
||||
info!("dm-verity root hash: {verity_hash}");
|
||||
}
|
||||
}
|
||||
info!("dm-verity root hash: {v}");
|
||||
Ok(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_verity_hash_file(
|
||||
use_cached_files: bool,
|
||||
layer_digest: &str,
|
||||
base_dir: &Path,
|
||||
decompressed_path: &PathBuf,
|
||||
client: &containerd_client::Client,
|
||||
) -> Result<()> {
|
||||
if use_cached_files && decompressed_path.exists() {
|
||||
info!("Using cached file {:?}", &decompressed_path);
|
||||
} else {
|
||||
std::fs::create_dir_all(base_dir)?;
|
||||
create_decompressed_layer_file(use_cached_files, layer_digest, decompressed_path, client)
|
||||
.await?;
|
||||
temp_dir.close()?;
|
||||
if error {
|
||||
// remove the cache file if we're using it
|
||||
if use_cached_files {
|
||||
std::fs::remove_file(cache_file)?;
|
||||
}
|
||||
warn!("{error_message}");
|
||||
}
|
||||
|
||||
do_create_verity_hash_file(decompressed_path)
|
||||
Ok(verity_hash)
|
||||
}
|
||||
|
||||
async fn create_decompressed_layer_file(
|
||||
use_cached_files: bool,
|
||||
layer_digest: &str,
|
||||
decompressed_path: &PathBuf,
|
||||
client: &containerd_client::Client,
|
||||
layer_digest: &str,
|
||||
decompressed_path: &Path,
|
||||
compressed_path: &Path,
|
||||
) -> Result<()> {
|
||||
let compressed_path = get_compressed_path(decompressed_path);
|
||||
info!("Pulling layer {layer_digest}");
|
||||
let mut file = tokio::fs::File::create(&compressed_path)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
|
||||
if use_cached_files && compressed_path.exists() {
|
||||
info!("Using cached file {:?}", &compressed_path);
|
||||
} else {
|
||||
info!("Pulling layer {layer_digest}");
|
||||
let mut file = tokio::fs::File::create(&compressed_path)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
info!("Decompressing layer");
|
||||
|
||||
info!("Decompressing layer");
|
||||
let req = containerd_client::services::v1::ReadContentRequest {
|
||||
digest: layer_digest.to_string(),
|
||||
offset: 0,
|
||||
size: 0,
|
||||
};
|
||||
let req = with_namespace!(req, "k8s.io");
|
||||
let mut c = client.content();
|
||||
let resp = c.read(req).await?;
|
||||
let mut stream = resp.into_inner();
|
||||
|
||||
let req = containerd_client::services::v1::ReadContentRequest {
|
||||
digest: layer_digest.to_string(),
|
||||
offset: 0,
|
||||
size: 0,
|
||||
};
|
||||
let req = with_namespace!(req, "k8s.io");
|
||||
let mut c = client.content();
|
||||
let resp = c.read(req).await?;
|
||||
let mut stream = resp.into_inner();
|
||||
|
||||
while let Some(chunk) = stream.message().await? {
|
||||
if chunk.offset < 0 {
|
||||
print!("oop")
|
||||
}
|
||||
file.seek(io::SeekFrom::Start(chunk.offset as u64)).await?;
|
||||
file.write_all(&chunk.data).await?;
|
||||
while let Some(chunk) = stream.message().await? {
|
||||
if chunk.offset < 0 {
|
||||
return Err(anyhow!("Too many Docker gzip layers"));
|
||||
}
|
||||
|
||||
file.flush()
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
.expect("Failed to flush file");
|
||||
file.seek(io::SeekFrom::Start(chunk.offset as u64)).await?;
|
||||
file.write_all(&chunk.data).await?;
|
||||
}
|
||||
|
||||
file.flush()
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
.expect("Failed to flush file");
|
||||
let compressed_file = std::fs::File::open(compressed_path).map_err(|e| anyhow!(e))?;
|
||||
let mut decompressed_file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
|
Loading…
Reference in New Issue
Block a user