mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-22 17:59:31 +00:00
agent/image: move image service related code into image-rpc.rs
Move image service related code into image-rpc.rs, to simplify maintenance. Fixes: #7633 Signed-off-by: Jiang Liu <gerry@linux.alibaba.com> Co-authored-by: wllenyj <wllenyj@linux.alibaba.com> Co-authored-by: jordan9500 <jordan.jackson@ibm.com> Co-authored-by: stevenhorsman <steven@uk.ibm.com>
This commit is contained in:
parent
81980388d4
commit
f218a3104e
@ -5,6 +5,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
@ -12,7 +13,7 @@ use std::process::Command;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use image_rs::image::ImageClient;
|
||||
use protocols::image;
|
||||
@ -20,9 +21,11 @@ use tokio::sync::Mutex;
|
||||
use ttrpc::{self, error::get_rpc_status as ttrpc_error};
|
||||
|
||||
use crate::rpc::{verify_cid, CONTAINER_BASE};
|
||||
use crate::sandbox::Sandbox;
|
||||
use crate::AGENT_CONFIG;
|
||||
|
||||
// A marker to merge container spec for images pulled inside guest.
|
||||
const ANNO_K8S_IMAGE_NAME: &str = "io.kubernetes.cri.image-name";
|
||||
|
||||
const AA_PATH: &str = "/usr/local/bin/attestation-agent";
|
||||
|
||||
const AA_KEYPROVIDER_URI: &str =
|
||||
@ -48,14 +51,14 @@ fn sl() -> slog::Logger {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ImageService {
|
||||
sandbox: Arc<Mutex<Sandbox>>,
|
||||
attestation_agent_started: Arc<AtomicBool>,
|
||||
image_client: Arc<Mutex<ImageClient>>,
|
||||
images: Arc<Mutex<HashMap<String, String>>>,
|
||||
container_count: Arc<AtomicU16>,
|
||||
}
|
||||
|
||||
impl ImageService {
|
||||
pub fn new(sandbox: Arc<Mutex<Sandbox>>) -> Self {
|
||||
pub fn new() -> Self {
|
||||
env::set_var("CC_IMAGE_WORK_DIR", KATA_CC_IMAGE_WORK_DIR);
|
||||
|
||||
let mut image_client = ImageClient::default();
|
||||
@ -72,13 +75,22 @@ impl ImageService {
|
||||
}
|
||||
|
||||
Self {
|
||||
sandbox,
|
||||
attestation_agent_started: Arc::new(AtomicBool::new(false)),
|
||||
image_client: Arc::new(Mutex::new(image_client)),
|
||||
images: Arc::new(Mutex::new(HashMap::new())),
|
||||
container_count: Arc::new(AtomicU16::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the singleton instance of image service.
|
||||
pub async fn singleton() -> Result<ImageService> {
|
||||
IMAGE_SERVICE
|
||||
.lock()
|
||||
.await
|
||||
.clone()
|
||||
.ok_or_else(|| anyhow!("image service is uninitialized"))
|
||||
}
|
||||
|
||||
// pause image is packaged in rootfs for CC
|
||||
fn unpack_pause_image(cid: &str) -> Result<()> {
|
||||
let cc_pause_bundle = Path::new(KATA_CC_PAUSE_BUNDLE);
|
||||
@ -171,9 +183,7 @@ impl ImageService {
|
||||
let image = req.image();
|
||||
if cid.starts_with("pause") {
|
||||
Self::unpack_pause_image(&cid)?;
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
sandbox.images.insert(String::from(image), cid);
|
||||
self.add_image(String::from(image), cid).await;
|
||||
return Ok(image.to_owned());
|
||||
}
|
||||
|
||||
@ -240,10 +250,80 @@ impl ImageService {
|
||||
}
|
||||
};
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
sandbox.images.insert(String::from(image), cid);
|
||||
self.add_image(String::from(image), cid).await;
|
||||
Ok(image.to_owned())
|
||||
}
|
||||
|
||||
async fn add_image(&self, image: String, cid: String) {
|
||||
self.images.lock().await.insert(image, cid);
|
||||
}
|
||||
|
||||
// When being passed an image name through a container annotation, merge its
|
||||
// corresponding bundle OCI specification into the passed container creation one.
|
||||
pub async fn merge_bundle_oci(&self, container_oci: &mut oci::Spec) -> Result<()> {
|
||||
if let Some(image_name) = container_oci
|
||||
.annotations
|
||||
.get(&ANNO_K8S_IMAGE_NAME.to_string())
|
||||
{
|
||||
let images = self.images.lock().await;
|
||||
if let Some(container_id) = images.get(image_name) {
|
||||
let image_oci_config_path = Path::new(CONTAINER_BASE)
|
||||
.join(container_id)
|
||||
.join(CONFIG_JSON);
|
||||
debug!(
|
||||
sl(),
|
||||
"Image bundle config path: {:?}", image_oci_config_path
|
||||
);
|
||||
|
||||
let image_oci =
|
||||
oci::Spec::load(image_oci_config_path.to_str().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Invalid container image OCI config path {:?}",
|
||||
image_oci_config_path
|
||||
)
|
||||
})?)
|
||||
.context("load image bundle")?;
|
||||
|
||||
if let Some(container_root) = container_oci.root.as_mut() {
|
||||
if let Some(image_root) = image_oci.root.as_ref() {
|
||||
let root_path = Path::new(CONTAINER_BASE)
|
||||
.join(container_id)
|
||||
.join(image_root.path.clone());
|
||||
container_root.path =
|
||||
String::from(root_path.to_str().ok_or_else(|| {
|
||||
anyhow!("Invalid container image root path {:?}", root_path)
|
||||
})?);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(container_process) = container_oci.process.as_mut() {
|
||||
if let Some(image_process) = image_oci.process.as_ref() {
|
||||
self.merge_oci_process(container_process, image_process);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Partially merge an OCI process specification into another one.
|
||||
fn merge_oci_process(&self, target: &mut oci::Process, source: &oci::Process) {
|
||||
if target.args.is_empty() && !source.args.is_empty() {
|
||||
target.args.append(&mut source.args.clone());
|
||||
}
|
||||
|
||||
if target.cwd == "/" && source.cwd != "/" {
|
||||
target.cwd = String::from(&source.cwd);
|
||||
}
|
||||
|
||||
for source_env in &source.env {
|
||||
let variable_name: Vec<&str> = source_env.split('=').collect();
|
||||
if !target.env.iter().any(|i| i.contains(variable_name[0])) {
|
||||
target.env.push(source_env.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -269,10 +349,7 @@ impl protocols::image_ttrpc_async::Image for ImageService {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ImageService;
|
||||
use crate::sandbox::Sandbox;
|
||||
use protocols::image;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cid_from_request() {
|
||||
@ -345,9 +422,7 @@ mod tests {
|
||||
},
|
||||
];
|
||||
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
let s = Sandbox::new(&logger).unwrap();
|
||||
let image_service = ImageService::new(Arc::new(Mutex::new(s)));
|
||||
let image_service = ImageService::new();
|
||||
for case in &cases {
|
||||
let mut req = image::PullImageRequest::new();
|
||||
req.set_image(case.image.to_string());
|
||||
@ -363,4 +438,139 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_cwd() {
|
||||
#[derive(Debug)]
|
||||
struct TestData<'a> {
|
||||
container_process_cwd: &'a str,
|
||||
image_process_cwd: &'a str,
|
||||
expected: &'a str,
|
||||
}
|
||||
|
||||
let tests = &[
|
||||
// Image cwd should override blank container cwd
|
||||
// TODO - how can we tell the user didn't specifically set it to `/` vs not setting at all? Is that scenario valid?
|
||||
TestData {
|
||||
container_process_cwd: "/",
|
||||
image_process_cwd: "/imageDir",
|
||||
expected: "/imageDir",
|
||||
},
|
||||
// Container cwd should override image cwd
|
||||
TestData {
|
||||
container_process_cwd: "/containerDir",
|
||||
image_process_cwd: "/imageDir",
|
||||
expected: "/containerDir",
|
||||
},
|
||||
// Container cwd should override blank image cwd
|
||||
TestData {
|
||||
container_process_cwd: "/containerDir",
|
||||
image_process_cwd: "/",
|
||||
expected: "/containerDir",
|
||||
},
|
||||
];
|
||||
|
||||
let image_service = ImageService::new();
|
||||
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
let msg = format!("test[{}]: {:?}", i, d);
|
||||
|
||||
let mut container_process = oci::Process {
|
||||
cwd: d.container_process_cwd.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let image_process = oci::Process {
|
||||
cwd: d.image_process_cwd.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
image_service.merge_oci_process(&mut container_process, &image_process);
|
||||
|
||||
assert_eq!(d.expected, container_process.cwd, "{}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_env() {
|
||||
#[derive(Debug)]
|
||||
struct TestData {
|
||||
container_process_env: Vec<String>,
|
||||
image_process_env: Vec<String>,
|
||||
expected: Vec<String>,
|
||||
}
|
||||
|
||||
let tests = &[
|
||||
// Test that the pods environment overrides the images
|
||||
TestData {
|
||||
container_process_env: vec!["ISPRODUCTION=true".to_string()],
|
||||
image_process_env: vec!["ISPRODUCTION=false".to_string()],
|
||||
expected: vec!["ISPRODUCTION=true".to_string()],
|
||||
},
|
||||
// Test that multiple environment variables can be overrided
|
||||
TestData {
|
||||
container_process_env: vec![
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
"ISDEVELOPMENT=false".to_string(),
|
||||
],
|
||||
image_process_env: vec![
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
expected: vec![
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
"ISDEVELOPMENT=false".to_string(),
|
||||
],
|
||||
},
|
||||
// Test that when none of the variables match do not override them
|
||||
TestData {
|
||||
container_process_env: vec!["ANOTHERENV=TEST".to_string()],
|
||||
image_process_env: vec![
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
expected: vec![
|
||||
"ANOTHERENV=TEST".to_string(),
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
},
|
||||
// Test a mix of both overriding and not
|
||||
TestData {
|
||||
container_process_env: vec![
|
||||
"ANOTHERENV=TEST".to_string(),
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
],
|
||||
image_process_env: vec![
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
expected: vec![
|
||||
"ANOTHERENV=TEST".to_string(),
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
let image_service = ImageService::new();
|
||||
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
let msg = format!("test[{}]: {:?}", i, d);
|
||||
|
||||
let mut container_process = oci::Process {
|
||||
env: d.container_process_env.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let image_process = oci::Process {
|
||||
env: d.image_process_env.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
image_service.merge_oci_process(&mut container_process, &image_process);
|
||||
|
||||
assert_eq!(d.expected, container_process.env, "{}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +90,6 @@ use std::path::PathBuf;
|
||||
|
||||
pub const CONTAINER_BASE: &str = "/run/kata-containers";
|
||||
const MODPROBE_PATH: &str = "/sbin/modprobe";
|
||||
const ANNO_K8S_IMAGE_NAME: &str = "io.kubernetes.cri.image-name";
|
||||
const CONFIG_JSON: &str = "config.json";
|
||||
const INIT_TRUSTED_STORAGE: &str = "/usr/bin/kata-init-trusted-storage";
|
||||
const TRUSTED_STORAGE_DEVICE: &str = "/dev/trusted_store";
|
||||
@ -162,24 +161,6 @@ pub fn verify_cid(id: &str) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Partially merge an OCI process specification into another one.
|
||||
fn merge_oci_process(target: &mut oci::Process, source: &oci::Process) {
|
||||
if target.args.is_empty() && !source.args.is_empty() {
|
||||
target.args.append(&mut source.args.clone());
|
||||
}
|
||||
|
||||
if target.cwd == "/" && source.cwd != "/" {
|
||||
target.cwd = String::from(&source.cwd);
|
||||
}
|
||||
|
||||
for source_env in &source.env {
|
||||
let variable_name: Vec<&str> = source_env.split('=').collect();
|
||||
if !target.env.iter().any(|i| i.contains(variable_name[0])) {
|
||||
target.env.push(source_env.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AgentService {
|
||||
#[instrument]
|
||||
async fn do_create_container(
|
||||
@ -209,7 +190,8 @@ impl AgentService {
|
||||
|
||||
// In case of pulling image inside guest, we need to merge the image bundle OCI spec
|
||||
// into the container creation request OCI spec.
|
||||
self.merge_bundle_oci(&mut oci).await?;
|
||||
let image_service = image_rpc::ImageService::singleton().await?;
|
||||
image_service.merge_bundle_oci(&mut oci).await?;
|
||||
|
||||
// Some devices need some extra processing (the ones invoked with
|
||||
// --device for instance), and that's what this call is doing. It
|
||||
@ -668,54 +650,6 @@ impl AgentService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When being passed an image name through a container annotation, merge its
|
||||
// corresponding bundle OCI specification into the passed container creation one.
|
||||
async fn merge_bundle_oci(&self, container_oci: &mut oci::Spec) -> Result<()> {
|
||||
if let Some(image_name) = container_oci
|
||||
.annotations
|
||||
.get(&ANNO_K8S_IMAGE_NAME.to_string())
|
||||
{
|
||||
if let Some(container_id) = self.sandbox.clone().lock().await.images.get(image_name) {
|
||||
let image_oci_config_path = Path::new(CONTAINER_BASE)
|
||||
.join(container_id)
|
||||
.join(CONFIG_JSON);
|
||||
debug!(
|
||||
sl(),
|
||||
"Image bundle config path: {:?}", image_oci_config_path
|
||||
);
|
||||
|
||||
let image_oci =
|
||||
oci::Spec::load(image_oci_config_path.to_str().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Invalid container image OCI config path {:?}",
|
||||
image_oci_config_path
|
||||
)
|
||||
})?)
|
||||
.context("load image bundle")?;
|
||||
|
||||
if let Some(container_root) = container_oci.root.as_mut() {
|
||||
if let Some(image_root) = image_oci.root.as_ref() {
|
||||
let root_path = Path::new(CONTAINER_BASE)
|
||||
.join(container_id)
|
||||
.join(image_root.path.clone());
|
||||
container_root.path =
|
||||
String::from(root_path.to_str().ok_or_else(|| {
|
||||
anyhow!("Invalid container image root path {:?}", root_path)
|
||||
})?);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(container_process) = container_oci.process.as_mut() {
|
||||
if let Some(image_process) = image_oci.process.as_ref() {
|
||||
merge_oci_process(container_process, image_process);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -1774,7 +1708,7 @@ pub async fn start(
|
||||
let health_service = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
|
||||
let hservice = health_ttrpc::create_health(Arc::new(health_service));
|
||||
|
||||
let image_service = image_rpc::ImageService::new(s);
|
||||
let image_service = image_rpc::ImageService::new();
|
||||
*image_rpc::IMAGE_SERVICE.lock().await = Some(image_service.clone());
|
||||
let image_service =
|
||||
Arc::new(Box::new(image_service) as Box<dyn image_ttrpc::Image + Send + Sync>);
|
||||
@ -3083,135 +3017,4 @@ COMMIT
|
||||
"We should see the resulting rule"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_cwd() {
|
||||
#[derive(Debug)]
|
||||
struct TestData<'a> {
|
||||
container_process_cwd: &'a str,
|
||||
image_process_cwd: &'a str,
|
||||
expected: &'a str,
|
||||
}
|
||||
|
||||
let tests = &[
|
||||
// Image cwd should override blank container cwd
|
||||
// TODO - how can we tell the user didn't specifically set it to `/` vs not setting at all? Is that scenario valid?
|
||||
TestData {
|
||||
container_process_cwd: "/",
|
||||
image_process_cwd: "/imageDir",
|
||||
expected: "/imageDir",
|
||||
},
|
||||
// Container cwd should override image cwd
|
||||
TestData {
|
||||
container_process_cwd: "/containerDir",
|
||||
image_process_cwd: "/imageDir",
|
||||
expected: "/containerDir",
|
||||
},
|
||||
// Container cwd should override blank image cwd
|
||||
TestData {
|
||||
container_process_cwd: "/containerDir",
|
||||
image_process_cwd: "/",
|
||||
expected: "/containerDir",
|
||||
},
|
||||
];
|
||||
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
let msg = format!("test[{}]: {:?}", i, d);
|
||||
|
||||
let mut container_process = oci::Process {
|
||||
cwd: d.container_process_cwd.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let image_process = oci::Process {
|
||||
cwd: d.image_process_cwd.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
merge_oci_process(&mut container_process, &image_process);
|
||||
|
||||
assert_eq!(d.expected, container_process.cwd, "{}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_env() {
|
||||
#[derive(Debug)]
|
||||
struct TestData {
|
||||
container_process_env: Vec<String>,
|
||||
image_process_env: Vec<String>,
|
||||
expected: Vec<String>,
|
||||
}
|
||||
|
||||
let tests = &[
|
||||
// Test that the pods environment overrides the images
|
||||
TestData {
|
||||
container_process_env: vec!["ISPRODUCTION=true".to_string()],
|
||||
image_process_env: vec!["ISPRODUCTION=false".to_string()],
|
||||
expected: vec!["ISPRODUCTION=true".to_string()],
|
||||
},
|
||||
// Test that multiple environment variables can be overrided
|
||||
TestData {
|
||||
container_process_env: vec![
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
"ISDEVELOPMENT=false".to_string(),
|
||||
],
|
||||
image_process_env: vec![
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
expected: vec![
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
"ISDEVELOPMENT=false".to_string(),
|
||||
],
|
||||
},
|
||||
// Test that when none of the variables match do not override them
|
||||
TestData {
|
||||
container_process_env: vec!["ANOTHERENV=TEST".to_string()],
|
||||
image_process_env: vec![
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
expected: vec![
|
||||
"ANOTHERENV=TEST".to_string(),
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
},
|
||||
// Test a mix of both overriding and not
|
||||
TestData {
|
||||
container_process_env: vec![
|
||||
"ANOTHERENV=TEST".to_string(),
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
],
|
||||
image_process_env: vec![
|
||||
"ISPRODUCTION=false".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
expected: vec![
|
||||
"ANOTHERENV=TEST".to_string(),
|
||||
"ISPRODUCTION=true".to_string(),
|
||||
"ISDEVELOPMENT=true".to_string(),
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
let msg = format!("test[{}]: {:?}", i, d);
|
||||
|
||||
let mut container_process = oci::Process {
|
||||
env: d.container_process_env.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let image_process = oci::Process {
|
||||
env: d.image_process_env.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
merge_oci_process(&mut container_process, &image_process);
|
||||
|
||||
assert_eq!(d.expected, container_process.env, "{}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,6 @@ pub struct Sandbox {
|
||||
pub event_tx: Option<Sender<String>>,
|
||||
pub bind_watcher: BindWatcher,
|
||||
pub pcimap: HashMap<pci::Address, pci::Address>,
|
||||
pub images: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl Sandbox {
|
||||
@ -98,7 +97,6 @@ impl Sandbox {
|
||||
event_tx: Some(tx),
|
||||
bind_watcher: BindWatcher::new(),
|
||||
pcimap: HashMap::new(),
|
||||
images: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user