agent: move guest pull abilities to Confidential Data Hub

Image pull abilities are all moved to the separate component
Confidential Data Hub (CDH) and we only left the auxiliary functions
except pull_image in confidential_data_hub/image.rs

Signed-off-by: Xynnn007 <xynnn@linux.alibaba.com>
This commit is contained in:
Xynnn007 2025-06-13 09:53:09 +08:00
parent 5067aafd56
commit 4436fe6d99
4 changed files with 171 additions and 2818 deletions

2624
src/agent/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -162,9 +162,6 @@ clap.workspace = true
strum.workspace = true
strum_macros.workspace = true
# Image pull/decrypt
image-rs = { git = "https://github.com/confidential-containers/guest-components", rev = "0a06ef241190780840fbb0542e51b198f1f72b0b", default-features = false, optional = true }
# Agent Policy
cdi = { git = "https://github.com/cncf-tags/container-device-interface-rs", rev = "fba5677a8e7cc962fc6e495fcec98d7d765e332a" }
@ -202,12 +199,9 @@ test-utils.workspace = true
lto = true
[features]
# The default-pull feature supports all sharing images by virtio-fs, for guest-pull build with the guest-pull feature
default-pull = []
seccomp = ["rustjail/seccomp"]
standard-oci-runtime = ["rustjail/standard-oci-runtime"]
agent-policy = ["kata-agent-policy"]
guest-pull = ["image-rs/kata-cc-rustls-tls"]
[[bin]]
name = "kata-agent"

View File

@ -7,20 +7,14 @@
use safe_path::scoped_join;
use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use image_rs::builder::ClientBuilder;
use image_rs::image::ImageClient;
use kata_sys_util::validate::verify_id;
use oci_spec::runtime as oci;
use tokio::sync::Mutex;
use crate::rpc::CONTAINER_BASE;
use crate::AGENT_CONFIG;
use kata_types::mount::KATA_VIRTUAL_VOLUME_IMAGE_GUEST_PULL;
use protocols::agent::Storage;
@ -34,17 +28,14 @@ const K8S_CONTAINER_TYPE_KEYS: [&str; 2] = [
"io.kubernetes.cri-o.ContainerType",
];
#[rustfmt::skip]
lazy_static! {
pub static ref IMAGE_SERVICE: Arc<Mutex<Option<ImageService>>> = Arc::new(Mutex::new(None));
}
// Convenience function to obtain the scope logger.
fn sl() -> slog::Logger {
slog_scope::logger().new(o!("subsystem" => "image"))
}
// Function to copy a file if it does not exist at the destination
// This function creates a dir, writes a file and if necessary,
// overwrites an existing file.
fn copy_if_not_exists(src: &Path, dst: &Path) -> Result<()> {
if let Some(dst_dir) = dst.parent() {
fs::create_dir_all(dst_dir)?;
@ -53,175 +44,91 @@ fn copy_if_not_exists(src: &Path, dst: &Path) -> Result<()> {
Ok(())
}
pub struct ImageService {
image_client: ImageClient,
/// get guest pause image process specification
fn get_pause_image_process() -> Result<oci::Process> {
let guest_pause_bundle = Path::new(KATA_PAUSE_BUNDLE);
if !guest_pause_bundle.exists() {
bail!("Pause image not present in rootfs");
}
let guest_pause_config = scoped_join(guest_pause_bundle, CONFIG_JSON)?;
let image_oci = oci::Spec::load(guest_pause_config.to_str().ok_or_else(|| {
anyhow!(
"Failed to load the guest pause image config from {:?}",
guest_pause_config
)
})?)
.context("load image config file")?;
let image_oci_process = image_oci.process().as_ref().ok_or_else(|| {
anyhow!("The guest pause image config does not contain a process specification. Please check the pause image.")
})?;
Ok(image_oci_process.clone())
}
impl ImageService {
pub async fn new() -> Result<Self> {
let mut image_client_builder =
ClientBuilder::default().work_dir(KATA_IMAGE_WORK_DIR.into());
#[cfg(feature = "guest-pull")]
{
if !AGENT_CONFIG.image_registry_auth.is_empty() {
let registry_auth = &AGENT_CONFIG.image_registry_auth;
debug!(sl(), "Set registry auth file {:?}", registry_auth);
image_client_builder = image_client_builder
.authenticated_registry_credentials_uri(registry_auth.into());
}
/// pause image is packaged in rootfs
pub fn unpack_pause_image(cid: &str) -> Result<String> {
verify_id(cid).context("The guest pause image cid contains invalid characters.")?;
let enable_signature_verification = &AGENT_CONFIG.enable_signature_verification;
debug!(
sl(),
"Enable image signature verification: {:?}", enable_signature_verification
);
if !AGENT_CONFIG.image_policy_file.is_empty() && *enable_signature_verification {
let image_policy_file = &AGENT_CONFIG.image_policy_file;
debug!(sl(), "Use image policy file {:?}", image_policy_file);
image_client_builder =
image_client_builder.image_security_policy_uri(image_policy_file.into());
}
}
let image_client = image_client_builder.build().await?;
Ok(Self { image_client })
let guest_pause_bundle = Path::new(KATA_PAUSE_BUNDLE);
if !guest_pause_bundle.exists() {
bail!("Pause image not present in rootfs");
}
let guest_pause_config = scoped_join(guest_pause_bundle, CONFIG_JSON)?;
info!(sl(), "use guest pause image cid {:?}", cid);
/// get guest pause image process specification
fn get_pause_image_process() -> Result<oci::Process> {
let guest_pause_bundle = Path::new(KATA_PAUSE_BUNDLE);
if !guest_pause_bundle.exists() {
bail!("Pause image not present in rootfs");
}
let guest_pause_config = scoped_join(guest_pause_bundle, CONFIG_JSON)?;
let image_oci = oci::Spec::load(guest_pause_config.to_str().ok_or_else(|| {
anyhow!(
"Failed to load the guest pause image config from {:?}",
guest_pause_config
)
})?)
.context("load image config file")?;
let image_oci = oci::Spec::load(guest_pause_config.to_str().ok_or_else(|| {
anyhow!(
"Failed to load the guest pause image config from {:?}",
guest_pause_config
)
})?)
.context("load image config file")?;
let image_oci_process = image_oci.process().as_ref().ok_or_else(|| {
let image_oci_process = image_oci.process().as_ref().ok_or_else(|| {
anyhow!("The guest pause image config does not contain a process specification. Please check the pause image.")
})?;
Ok(image_oci_process.clone())
}
info!(
sl(),
"pause image oci process {:?}",
image_oci_process.clone()
);
/// pause image is packaged in rootfs
fn unpack_pause_image(cid: &str) -> Result<String> {
verify_id(cid).context("The guest pause image cid contains invalid characters.")?;
// Ensure that the args vector is not empty before accessing its elements.
// Check the number of arguments.
let args = if let Some(args_vec) = image_oci_process.args() {
args_vec
} else {
bail!("The number of args should be greater than or equal to one! Please check the pause image.");
};
let guest_pause_bundle = Path::new(KATA_PAUSE_BUNDLE);
if !guest_pause_bundle.exists() {
bail!("Pause image not present in rootfs");
}
let guest_pause_config = scoped_join(guest_pause_bundle, CONFIG_JSON)?;
info!(sl(), "use guest pause image cid {:?}", cid);
let pause_bundle = scoped_join(CONTAINER_BASE, cid)?;
fs::create_dir_all(&pause_bundle)?;
let pause_rootfs = scoped_join(&pause_bundle, "rootfs")?;
fs::create_dir_all(&pause_rootfs)?;
info!(sl(), "pause_rootfs {:?}", pause_rootfs);
let image_oci = oci::Spec::load(guest_pause_config.to_str().ok_or_else(|| {
anyhow!(
"Failed to load the guest pause image config from {:?}",
guest_pause_config
)
})?)
.context("load image config file")?;
copy_if_not_exists(&guest_pause_config, &pause_bundle.join(CONFIG_JSON))?;
let arg_path = Path::new(&args[0]).strip_prefix("/")?;
copy_if_not_exists(
&guest_pause_bundle.join("rootfs").join(arg_path),
&pause_rootfs.join(arg_path),
)?;
Ok(pause_rootfs.display().to_string())
}
let image_oci_process = image_oci.process().as_ref().ok_or_else(|| {
anyhow!("The guest pause image config does not contain a process specification. Please check the pause image.")
})?;
info!(
sl(),
"pause image oci process {:?}",
image_oci_process.clone()
);
// Ensure that the args vector is not empty before accessing its elements.
// Check the number of arguments.
let args = if let Some(args_vec) = image_oci_process.args() {
args_vec
} else {
bail!("The number of args should be greater than or equal to one! Please check the pause image.");
};
let pause_bundle = scoped_join(CONTAINER_BASE, cid)?;
fs::create_dir_all(&pause_bundle)?;
let pause_rootfs = scoped_join(&pause_bundle, "rootfs")?;
fs::create_dir_all(&pause_rootfs)?;
info!(sl(), "pause_rootfs {:?}", pause_rootfs);
copy_if_not_exists(&guest_pause_config, &pause_bundle.join(CONFIG_JSON))?;
let arg_path = Path::new(&args[0]).strip_prefix("/")?;
copy_if_not_exists(
&guest_pause_bundle.join("rootfs").join(arg_path),
&pause_rootfs.join(arg_path),
)?;
Ok(pause_rootfs.display().to_string())
}
/// check whether the image is for sandbox or for container.
fn is_sandbox(image_metadata: &HashMap<String, String>) -> bool {
let mut is_sandbox = false;
for key in K8S_CONTAINER_TYPE_KEYS.iter() {
if let Some(value) = image_metadata.get(key as &str) {
if value == "sandbox" {
is_sandbox = true;
break;
}
/// check whether the image is for sandbox or for container.
pub fn is_sandbox(image_metadata: &HashMap<String, String>) -> bool {
let mut is_sandbox = false;
for key in K8S_CONTAINER_TYPE_KEYS.iter() {
if let Some(value) = image_metadata.get(key as &str) {
if value == "sandbox" {
is_sandbox = true;
break;
}
}
is_sandbox
}
/// pull_image is used for call image-rs to pull image in the guest.
/// # Parameters
/// - `image`: Image name (exp: quay.io/prometheus/busybox:latest)
/// - `cid`: Container id
/// - `image_metadata`: Annotations about the image (exp: "containerd.io/snapshot/cri.layer-digest": "sha256:24fb2886d6f6c5d16481dd7608b47e78a8e92a13d6e64d87d57cb16d5f766d63")
/// # Returns
/// - The image rootfs bundle path. (exp. /run/kata-containers/cb0b47276ea66ee9f44cc53afa94d7980b57a52c3f306f68cb034e58d9fbd3c6/rootfs)
pub async fn pull_image(
&mut self,
image: &str,
cid: &str,
image_metadata: &HashMap<String, String>,
) -> Result<String> {
info!(sl(), "image metadata: {image_metadata:?}");
if Self::is_sandbox(image_metadata) {
let mount_path = Self::unpack_pause_image(cid)?;
return Ok(mount_path);
}
// Image layers will store at KATA_IMAGE_WORK_DIR, generated bundles
// with rootfs and config.json will store under CONTAINER_BASE/cid/images.
let bundle_path = scoped_join(CONTAINER_BASE, cid)?;
fs::create_dir_all(&bundle_path)?;
info!(sl(), "pull image {image:?}, bundle path {bundle_path:?}");
let res = self
.image_client
.pull_image(image, &bundle_path, &None, &None)
.await;
match res {
Ok(image) => {
info!(
sl(),
"pull and unpack image {image:?}, cid: {cid:?} succeeded."
);
}
Err(e) => {
error!(
sl(),
"pull and unpack image {image:?}, cid: {cid:?} failed with {:?}.",
e.to_string()
);
return Err(e);
}
};
let image_bundle_path = scoped_join(&bundle_path, "rootfs")?;
Ok(image_bundle_path.as_path().display().to_string())
}
is_sandbox
}
/// get_process overrides the OCI process spec with pause image process spec if needed
@ -239,58 +146,11 @@ pub fn get_process(
}
if guest_pull {
if let Some(a) = oci.annotations() {
if ImageService::is_sandbox(a) {
return ImageService::get_pause_image_process();
if is_sandbox(a) {
return get_pause_image_process();
}
}
}
Ok(ocip.clone())
}
/// Set proxy environment from AGENT_CONFIG
pub async fn set_proxy_env_vars() {
if env::var("HTTPS_PROXY").is_err() {
let https_proxy = &AGENT_CONFIG.https_proxy;
if !https_proxy.is_empty() {
env::set_var("HTTPS_PROXY", https_proxy);
}
}
match env::var("HTTPS_PROXY") {
Ok(val) => info!(sl(), "https_proxy is set to: {}", val),
Err(e) => info!(sl(), "https_proxy is not set ({})", e),
};
if env::var("NO_PROXY").is_err() {
let no_proxy = &AGENT_CONFIG.no_proxy;
if !no_proxy.is_empty() {
env::set_var("NO_PROXY", no_proxy);
}
}
match env::var("NO_PROXY") {
Ok(val) => info!(sl(), "no_proxy is set to: {}", val),
Err(e) => info!(sl(), "no_proxy is not set ({})", e),
};
}
/// Init the image service
pub async fn init_image_service() -> Result<()> {
let image_service = ImageService::new().await?;
*IMAGE_SERVICE.lock().await = Some(image_service);
Ok(())
}
pub async fn pull_image(
image: &str,
cid: &str,
image_metadata: &HashMap<String, String>,
) -> Result<String> {
let image_service = IMAGE_SERVICE.clone();
let mut image_service = image_service.lock().await;
let image_service = image_service
.as_mut()
.expect("Image Service not initialized");
image_service.pull_image(image, cid, image_metadata).await
}

View File

@ -1,4 +1,5 @@
// Copyright (c) 2023 Intel Corporation
// Copyright (c) 2025 Alibaba Cloud
//
// SPDX-License-Identifier: Apache-2.0
//
@ -15,14 +16,18 @@ use protocols::{
confidential_data_hub::GetResourceRequest,
confidential_data_hub_ttrpc_async,
confidential_data_hub_ttrpc_async::{
GetResourceServiceClient, SealedSecretServiceClient, SecureMountServiceClient,
GetResourceServiceClient, ImagePullServiceClient, SealedSecretServiceClient,
SecureMountServiceClient,
},
};
use safe_path::scoped_join;
use std::fs;
use std::os::unix::fs::symlink;
use std::path::Path;
use std::{os::unix::fs::symlink, path::PathBuf};
use tokio::sync::OnceCell;
pub mod image;
pub static CDH_CLIENT: OnceCell<CDHClient> = OnceCell::const_new();
const SEALED_SECRET_PREFIX: &str = "sealed.";
@ -41,6 +46,8 @@ pub struct CDHClient {
secure_mount_client: SecureMountServiceClient,
#[derivative(Debug = "ignore")]
get_resource_client: GetResourceServiceClient,
#[derivative(Debug = "ignore")]
image_pull_client: ImagePullServiceClient,
}
impl CDHClient {
@ -48,6 +55,8 @@ impl CDHClient {
let client = ttrpc::asynchronous::Client::connect(cdh_socket_uri)?;
let sealed_secret_client =
confidential_data_hub_ttrpc_async::SealedSecretServiceClient::new(client.clone());
let image_pull_client =
confidential_data_hub_ttrpc_async::ImagePullServiceClient::new(client.clone());
let secure_mount_client =
confidential_data_hub_ttrpc_async::SecureMountServiceClient::new(client.clone());
let get_resource_client =
@ -56,6 +65,7 @@ impl CDHClient {
sealed_secret_client,
secure_mount_client,
get_resource_client,
image_pull_client,
})
}
@ -111,6 +121,24 @@ impl CDHClient {
.await?;
Ok(res.Resource)
}
pub async fn pull_image(&self, image: &str, bundle_path: &str) -> Result<()> {
let req = confidential_data_hub::ImagePullRequest {
image_url: image.to_string(),
bundle_path: bundle_path.to_string(),
..Default::default()
};
let _ = self
.image_pull_client
.pull_image(
ttrpc::context::with_timeout(AGENT_CONFIG.cdh_api_timeout.as_nanos() as i64),
&req,
)
.await?;
Ok(())
}
}
pub async fn init_cdh_client(cdh_socket_uri: &str) -> Result<()> {
@ -119,6 +147,7 @@ pub async fn init_cdh_client(cdh_socket_uri: &str) -> Result<()> {
CDHClient::new(cdh_socket_uri).context("Failed to create CDH Client")
})
.await?;
Ok(())
}
@ -143,6 +172,29 @@ pub async fn unseal_env(env: &str) -> Result<String> {
Ok((*env.to_owned()).to_string())
}
/// pull_image is used for call confidential data hub to pull image in the guest.
/// Image layers will store at [`image::KATA_IMAGE_WORK_DIR`]`,
/// rootfs and config.json will store under given `bundle_path`.
///
/// # Parameters
/// - `image`: Image name (exp: quay.io/prometheus/busybox:latest)
/// - `bundle_path`: The path to store the image bundle (exp. /run/kata-containers/cb0b47276ea66ee9f44cc53afa94d7980b57a52c3f306f68cb034e58d9fbd3c6/rootfs)
pub async fn pull_image(image: &str, bundle_path: PathBuf) -> Result<String> {
fs::create_dir_all(&bundle_path)?;
info!(sl(), "pull image {image:?}, bundle path {bundle_path:?}");
let cdh_client = CDH_CLIENT
.get()
.expect("Confidential Data Hub not initialized");
cdh_client
.pull_image(image, bundle_path.to_string_lossy().as_ref())
.await?;
let image_bundle_path = scoped_join(&bundle_path, "rootfs")?;
Ok(image_bundle_path.as_path().display().to_string())
}
pub async fn unseal_file(path: &str) -> Result<()> {
let cdh_client = CDH_CLIENT
.get()
@ -211,7 +263,6 @@ pub async fn unseal_file(path: &str) -> Result<()> {
Ok(())
}
#[cfg(feature = "guest-pull")]
pub async fn secure_mount(
volume_type: &str,
options: &std::collections::HashMap<String, String>,
@ -262,6 +313,18 @@ mod tests {
}
}
#[async_trait]
impl confidential_data_hub_ttrpc_async::ImagePullService for TestService {
async fn pull_image(
&self,
_ctx: &::ttrpc::asynchronous::TtrpcContext,
_req: confidential_data_hub::ImagePullRequest,
) -> ttrpc::error::Result<confidential_data_hub::ImagePullResponse> {
let output = confidential_data_hub::ImagePullResponse::new();
Ok(output)
}
}
fn remove_if_sock_exist(sock_addr: &str) -> std::io::Result<()> {
let path = sock_addr
.strip_prefix("unix://")