Merge pull request #7635 from jiangliu/image-service-singleton

CC | move image service related code into image-rpc.rs
This commit is contained in:
Jiang Liu 2023-08-21 22:01:46 +08:00 committed by GitHub
commit cfba372f17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 257 additions and 243 deletions

View File

@ -5,6 +5,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::Path;
@ -12,18 +13,18 @@ use std::process::Command;
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::Arc;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use image_rs::image::ImageClient;
use protocols::image;
use tokio::sync::Mutex;
use ttrpc::{self, error::get_rpc_status as ttrpc_error};
use crate::rpc::{verify_cid, CONTAINER_BASE};
use crate::sandbox::Sandbox;
use crate::AGENT_CONFIG;
use image_rs::image::ImageClient;
use std::io::Write;
// A marker to merge container spec for images pulled inside guest.
const ANNO_K8S_IMAGE_NAME: &str = "io.kubernetes.cri.image-name";
const AA_PATH: &str = "/usr/local/bin/attestation-agent";
@ -38,46 +39,58 @@ const KATA_CC_IMAGE_WORK_DIR: &str = "/run/image/";
const KATA_CC_PAUSE_BUNDLE: &str = "/pause_bundle";
const CONFIG_JSON: &str = "config.json";
#[rustfmt::skip]
lazy_static! {
pub static ref IMAGE_SERVICE: Mutex<Option<ImageService>> = Mutex::new(None);
}
// Convenience function to obtain the scope logger.
fn sl() -> slog::Logger {
slog_scope::logger().new(o!("subsystem" => "cgroups"))
}
#[derive(Clone)]
pub struct ImageService {
sandbox: Arc<Mutex<Sandbox>>,
attestation_agent_started: AtomicBool,
attestation_agent_started: Arc<AtomicBool>,
image_client: Arc<Mutex<ImageClient>>,
images: Arc<Mutex<HashMap<String, String>>>,
container_count: Arc<AtomicU16>,
}
impl ImageService {
pub async fn new(sandbox: Arc<Mutex<Sandbox>>) -> Self {
pub fn new() -> Self {
env::set_var("CC_IMAGE_WORK_DIR", KATA_CC_IMAGE_WORK_DIR);
let mut image_client = ImageClient::default();
let image_policy_file = &AGENT_CONFIG.image_policy_file;
if !image_policy_file.is_empty() {
image_client.config.file_paths.sigstore_config = image_policy_file.clone();
if !AGENT_CONFIG.image_policy_file.is_empty() {
image_client.config.file_paths.sigstore_config = AGENT_CONFIG.image_policy_file.clone();
}
let simple_signing_sigstore_config = &AGENT_CONFIG.simple_signing_sigstore_config;
if !simple_signing_sigstore_config.is_empty() {
image_client.config.file_paths.sigstore_config = simple_signing_sigstore_config.clone();
if !AGENT_CONFIG.simple_signing_sigstore_config.is_empty() {
image_client.config.file_paths.sigstore_config =
AGENT_CONFIG.simple_signing_sigstore_config.clone();
}
let image_registry_auth_file = &AGENT_CONFIG.image_registry_auth_file;
if !image_registry_auth_file.is_empty() {
image_client.config.file_paths.auth_file = image_registry_auth_file.clone();
if !AGENT_CONFIG.image_registry_auth_file.is_empty() {
image_client.config.file_paths.auth_file =
AGENT_CONFIG.image_registry_auth_file.clone();
}
Self {
sandbox,
attestation_agent_started: AtomicBool::new(false),
attestation_agent_started: Arc::new(AtomicBool::new(false)),
image_client: Arc::new(Mutex::new(image_client)),
images: Arc::new(Mutex::new(HashMap::new())),
container_count: Arc::new(AtomicU16::new(0)),
}
}
/// Get the singleton instance of image service.
pub async fn singleton() -> Result<ImageService> {
IMAGE_SERVICE
.lock()
.await
.clone()
.ok_or_else(|| anyhow!("image service is uninitialized"))
}
// pause image is packaged in rootfs for CC
fn unpack_pause_image(cid: &str) -> Result<()> {
let cc_pause_bundle = Path::new(KATA_CC_PAUSE_BUNDLE);
@ -119,8 +132,9 @@ impl ImageService {
}
});
let mut config_file = fs::File::create(config_path)?;
config_file.write_all(ocicrypt_config.to_string().as_bytes())?;
fs::write(config_path, ocicrypt_config.to_string().as_bytes())?;
env::set_var("OCICRYPT_KEYPROVIDER_CONFIG", config_path);
// The Attestation Agent will run for the duration of the guest.
Command::new(AA_PATH)
@ -129,6 +143,7 @@ impl ImageService {
.arg("--getresource_sock")
.arg(AA_GETRESOURCE_URI)
.spawn()?;
Ok(())
}
@ -154,8 +169,6 @@ impl ImageService {
}
async fn pull_image(&self, req: &image::PullImageRequest) -> Result<String> {
env::set_var("OCICRYPT_KEYPROVIDER_CONFIG", OCICRYPT_CONFIG_PATH);
let https_proxy = &AGENT_CONFIG.https_proxy;
if !https_proxy.is_empty() {
env::set_var("HTTPS_PROXY", https_proxy);
@ -170,9 +183,7 @@ impl ImageService {
let image = req.image();
if cid.starts_with("pause") {
Self::unpack_pause_image(&cid)?;
let mut sandbox = self.sandbox.lock().await;
sandbox.images.insert(String::from(image), cid);
self.add_image(String::from(image), cid).await;
return Ok(image.to_owned());
}
@ -195,6 +206,7 @@ impl ImageService {
!aa_kbc_params.is_empty()
);
self.image_client.lock().await.config.auth = !aa_kbc_params.is_empty();
let decrypt_config = format!("provider:attestation-agent:{}", aa_kbc_params);
// Read enable signature verification from the agent config and set it in the image_client
let enable_signature_verification = &AGENT_CONFIG.enable_signature_verification;
@ -209,8 +221,6 @@ impl ImageService {
let bundle_path = Path::new(CONTAINER_BASE).join(&cid);
fs::create_dir_all(&bundle_path)?;
let decrypt_config = format!("provider:attestation-agent:{}", aa_kbc_params);
info!(sl(), "pull image {:?}, bundle path {:?}", cid, bundle_path);
// Image layers will store at KATA_CC_IMAGE_WORK_DIR, generated bundles
// with rootfs and config.json will store under CONTAINER_BASE/cid.
@ -240,10 +250,80 @@ impl ImageService {
}
};
let mut sandbox = self.sandbox.lock().await;
sandbox.images.insert(String::from(image), cid);
self.add_image(String::from(image), cid).await;
Ok(image.to_owned())
}
async fn add_image(&self, image: String, cid: String) {
self.images.lock().await.insert(image, cid);
}
// When being passed an image name through a container annotation, merge its
// corresponding bundle OCI specification into the passed container creation one.
pub async fn merge_bundle_oci(&self, container_oci: &mut oci::Spec) -> Result<()> {
if let Some(image_name) = container_oci
.annotations
.get(&ANNO_K8S_IMAGE_NAME.to_string())
{
let images = self.images.lock().await;
if let Some(container_id) = images.get(image_name) {
let image_oci_config_path = Path::new(CONTAINER_BASE)
.join(container_id)
.join(CONFIG_JSON);
debug!(
sl(),
"Image bundle config path: {:?}", image_oci_config_path
);
let image_oci =
oci::Spec::load(image_oci_config_path.to_str().ok_or_else(|| {
anyhow!(
"Invalid container image OCI config path {:?}",
image_oci_config_path
)
})?)
.context("load image bundle")?;
if let Some(container_root) = container_oci.root.as_mut() {
if let Some(image_root) = image_oci.root.as_ref() {
let root_path = Path::new(CONTAINER_BASE)
.join(container_id)
.join(image_root.path.clone());
container_root.path =
String::from(root_path.to_str().ok_or_else(|| {
anyhow!("Invalid container image root path {:?}", root_path)
})?);
}
}
if let Some(container_process) = container_oci.process.as_mut() {
if let Some(image_process) = image_oci.process.as_ref() {
self.merge_oci_process(container_process, image_process);
}
}
}
}
Ok(())
}
// Partially merge an OCI process specification into another one.
fn merge_oci_process(&self, target: &mut oci::Process, source: &oci::Process) {
if target.args.is_empty() && !source.args.is_empty() {
target.args.append(&mut source.args.clone());
}
if target.cwd == "/" && source.cwd != "/" {
target.cwd = String::from(&source.cwd);
}
for source_env in &source.env {
let variable_name: Vec<&str> = source_env.split('=').collect();
if !target.env.iter().any(|i| i.contains(variable_name[0])) {
target.env.push(source_env.to_string());
}
}
}
}
#[async_trait]
@ -269,10 +349,7 @@ impl protocols::image_ttrpc_async::Image for ImageService {
#[cfg(test)]
mod tests {
use super::ImageService;
use crate::sandbox::Sandbox;
use protocols::image;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::test]
async fn test_cid_from_request() {
@ -345,9 +422,7 @@ mod tests {
},
];
let logger = slog::Logger::root(slog::Discard, o!());
let s = Sandbox::new(&logger).unwrap();
let image_service = ImageService::new(Arc::new(Mutex::new(s))).await;
let image_service = ImageService::new();
for case in &cases {
let mut req = image::PullImageRequest::new();
req.set_image(case.image.to_string());
@ -363,4 +438,139 @@ mod tests {
}
}
}
#[tokio::test]
async fn test_merge_cwd() {
#[derive(Debug)]
struct TestData<'a> {
container_process_cwd: &'a str,
image_process_cwd: &'a str,
expected: &'a str,
}
let tests = &[
// Image cwd should override blank container cwd
// TODO - how can we tell the user didn't specifically set it to `/` vs not setting at all? Is that scenario valid?
TestData {
container_process_cwd: "/",
image_process_cwd: "/imageDir",
expected: "/imageDir",
},
// Container cwd should override image cwd
TestData {
container_process_cwd: "/containerDir",
image_process_cwd: "/imageDir",
expected: "/containerDir",
},
// Container cwd should override blank image cwd
TestData {
container_process_cwd: "/containerDir",
image_process_cwd: "/",
expected: "/containerDir",
},
];
let image_service = ImageService::new();
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let mut container_process = oci::Process {
cwd: d.container_process_cwd.to_string(),
..Default::default()
};
let image_process = oci::Process {
cwd: d.image_process_cwd.to_string(),
..Default::default()
};
image_service.merge_oci_process(&mut container_process, &image_process);
assert_eq!(d.expected, container_process.cwd, "{}", msg);
}
}
#[tokio::test]
async fn test_merge_env() {
#[derive(Debug)]
struct TestData {
container_process_env: Vec<String>,
image_process_env: Vec<String>,
expected: Vec<String>,
}
let tests = &[
// Test that the pods environment overrides the images
TestData {
container_process_env: vec!["ISPRODUCTION=true".to_string()],
image_process_env: vec!["ISPRODUCTION=false".to_string()],
expected: vec!["ISPRODUCTION=true".to_string()],
},
// Test that multiple environment variables can be overrided
TestData {
container_process_env: vec![
"ISPRODUCTION=true".to_string(),
"ISDEVELOPMENT=false".to_string(),
],
image_process_env: vec![
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
expected: vec![
"ISPRODUCTION=true".to_string(),
"ISDEVELOPMENT=false".to_string(),
],
},
// Test that when none of the variables match do not override them
TestData {
container_process_env: vec!["ANOTHERENV=TEST".to_string()],
image_process_env: vec![
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
expected: vec![
"ANOTHERENV=TEST".to_string(),
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
},
// Test a mix of both overriding and not
TestData {
container_process_env: vec![
"ANOTHERENV=TEST".to_string(),
"ISPRODUCTION=true".to_string(),
],
image_process_env: vec![
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
expected: vec![
"ANOTHERENV=TEST".to_string(),
"ISPRODUCTION=true".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
},
];
let image_service = ImageService::new();
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let mut container_process = oci::Process {
env: d.container_process_env.clone(),
..Default::default()
};
let image_process = oci::Process {
env: d.image_process_env.clone(),
..Default::default()
};
image_service.merge_oci_process(&mut container_process, &image_process);
assert_eq!(d.expected, container_process.env, "{}", msg);
}
}
}

View File

@ -90,7 +90,6 @@ use std::path::PathBuf;
pub const CONTAINER_BASE: &str = "/run/kata-containers";
const MODPROBE_PATH: &str = "/sbin/modprobe";
const ANNO_K8S_IMAGE_NAME: &str = "io.kubernetes.cri.image-name";
const CONFIG_JSON: &str = "config.json";
const INIT_TRUSTED_STORAGE: &str = "/usr/bin/kata-init-trusted-storage";
const TRUSTED_STORAGE_DEVICE: &str = "/dev/trusted_store";
@ -162,24 +161,6 @@ pub fn verify_cid(id: &str) -> Result<()> {
}
}
// Partially merge an OCI process specification into another one.
fn merge_oci_process(target: &mut oci::Process, source: &oci::Process) {
if target.args.is_empty() && !source.args.is_empty() {
target.args.append(&mut source.args.clone());
}
if target.cwd == "/" && source.cwd != "/" {
target.cwd = String::from(&source.cwd);
}
for source_env in &source.env {
let variable_name: Vec<&str> = source_env.split('=').collect();
if !target.env.iter().any(|i| i.contains(variable_name[0])) {
target.env.push(source_env.to_string());
}
}
}
impl AgentService {
#[instrument]
async fn do_create_container(
@ -207,8 +188,10 @@ impl AgentService {
"receive createcontainer, storages: {:?}", &req.storages
);
// Merge the image bundle OCI spec into the container creation request OCI spec.
self.merge_bundle_oci(&mut oci).await?;
// In case of pulling image inside guest, we need to merge the image bundle OCI spec
// into the container creation request OCI spec.
let image_service = image_rpc::ImageService::singleton().await?;
image_service.merge_bundle_oci(&mut oci).await?;
// Some devices need some extra processing (the ones invoked with
// --device for instance), and that's what this call is doing. It
@ -667,54 +650,6 @@ impl AgentService {
}
}
}
// When being passed an image name through a container annotation, merge its
// corresponding bundle OCI specification into the passed container creation one.
async fn merge_bundle_oci(&self, container_oci: &mut oci::Spec) -> Result<()> {
if let Some(image_name) = container_oci
.annotations
.get(&ANNO_K8S_IMAGE_NAME.to_string())
{
if let Some(container_id) = self.sandbox.clone().lock().await.images.get(image_name) {
let image_oci_config_path = Path::new(CONTAINER_BASE)
.join(container_id)
.join(CONFIG_JSON);
debug!(
sl(),
"Image bundle config path: {:?}", image_oci_config_path
);
let image_oci =
oci::Spec::load(image_oci_config_path.to_str().ok_or_else(|| {
anyhow!(
"Invalid container image OCI config path {:?}",
image_oci_config_path
)
})?)
.context("load image bundle")?;
if let Some(container_root) = container_oci.root.as_mut() {
if let Some(image_root) = image_oci.root.as_ref() {
let root_path = Path::new(CONTAINER_BASE)
.join(container_id)
.join(image_root.path.clone());
container_root.path =
String::from(root_path.to_str().ok_or_else(|| {
anyhow!("Invalid container image root path {:?}", root_path)
})?);
}
}
if let Some(container_process) = container_oci.process.as_mut() {
if let Some(image_process) = image_oci.process.as_ref() {
merge_oci_process(container_process, image_process);
}
}
}
}
Ok(())
}
}
#[async_trait]
@ -1773,9 +1708,11 @@ pub async fn start(
let health_service = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
let hservice = health_ttrpc::create_health(Arc::new(health_service));
let image_service = Box::new(image_rpc::ImageService::new(s).await)
as Box<dyn image_ttrpc::Image + Send + Sync>;
let iservice = image_ttrpc::create_image(Arc::new(image_service));
let image_service = image_rpc::ImageService::new();
*image_rpc::IMAGE_SERVICE.lock().await = Some(image_service.clone());
let image_service =
Arc::new(Box::new(image_service) as Box<dyn image_ttrpc::Image + Send + Sync>);
let iservice = image_ttrpc::create_image(image_service);
let server = TtrpcServer::new()
.bind(server_address)?
@ -3080,135 +3017,4 @@ COMMIT
"We should see the resulting rule"
);
}
#[tokio::test]
async fn test_merge_cwd() {
#[derive(Debug)]
struct TestData<'a> {
container_process_cwd: &'a str,
image_process_cwd: &'a str,
expected: &'a str,
}
let tests = &[
// Image cwd should override blank container cwd
// TODO - how can we tell the user didn't specifically set it to `/` vs not setting at all? Is that scenario valid?
TestData {
container_process_cwd: "/",
image_process_cwd: "/imageDir",
expected: "/imageDir",
},
// Container cwd should override image cwd
TestData {
container_process_cwd: "/containerDir",
image_process_cwd: "/imageDir",
expected: "/containerDir",
},
// Container cwd should override blank image cwd
TestData {
container_process_cwd: "/containerDir",
image_process_cwd: "/",
expected: "/containerDir",
},
];
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let mut container_process = oci::Process {
cwd: d.container_process_cwd.to_string(),
..Default::default()
};
let image_process = oci::Process {
cwd: d.image_process_cwd.to_string(),
..Default::default()
};
merge_oci_process(&mut container_process, &image_process);
assert_eq!(d.expected, container_process.cwd, "{}", msg);
}
}
#[tokio::test]
async fn test_merge_env() {
#[derive(Debug)]
struct TestData {
container_process_env: Vec<String>,
image_process_env: Vec<String>,
expected: Vec<String>,
}
let tests = &[
// Test that the pods environment overrides the images
TestData {
container_process_env: vec!["ISPRODUCTION=true".to_string()],
image_process_env: vec!["ISPRODUCTION=false".to_string()],
expected: vec!["ISPRODUCTION=true".to_string()],
},
// Test that multiple environment variables can be overrided
TestData {
container_process_env: vec![
"ISPRODUCTION=true".to_string(),
"ISDEVELOPMENT=false".to_string(),
],
image_process_env: vec![
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
expected: vec![
"ISPRODUCTION=true".to_string(),
"ISDEVELOPMENT=false".to_string(),
],
},
// Test that when none of the variables match do not override them
TestData {
container_process_env: vec!["ANOTHERENV=TEST".to_string()],
image_process_env: vec![
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
expected: vec![
"ANOTHERENV=TEST".to_string(),
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
},
// Test a mix of both overriding and not
TestData {
container_process_env: vec![
"ANOTHERENV=TEST".to_string(),
"ISPRODUCTION=true".to_string(),
],
image_process_env: vec![
"ISPRODUCTION=false".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
expected: vec![
"ANOTHERENV=TEST".to_string(),
"ISPRODUCTION=true".to_string(),
"ISDEVELOPMENT=true".to_string(),
],
},
];
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let mut container_process = oci::Process {
env: d.container_process_env.clone(),
..Default::default()
};
let image_process = oci::Process {
env: d.image_process_env.clone(),
..Default::default()
};
merge_oci_process(&mut container_process, &image_process);
assert_eq!(d.expected, container_process.env, "{}", msg);
}
}
}

View File

@ -64,7 +64,6 @@ pub struct Sandbox {
pub event_tx: Option<Sender<String>>,
pub bind_watcher: BindWatcher,
pub pcimap: HashMap<pci::Address, pci::Address>,
pub images: HashMap<String, String>,
}
impl Sandbox {
@ -98,7 +97,6 @@ impl Sandbox {
event_tx: Some(tx),
bind_watcher: BindWatcher::new(),
pcimap: HashMap::new(),
images: HashMap::new(),
})
}