Merge pull request #10349 from lifupan/main_nsandboxapi

sandbox: refactor the sandbox init process
This commit is contained in:
Xuewei Niu 2024-09-27 11:10:45 +08:00 committed by GitHub
commit 11b1a72442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 207 additions and 136 deletions

View File

@ -4,7 +4,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::convert::TryFrom;
use std::{collections::HashMap, convert::TryFrom};
use anyhow::{Context, Result};
use kata_types::{
@ -22,17 +22,13 @@ struct InitialSize {
orig_toml_default_mem: u32,
}
// generate initial resource(vcpu and memory in MiB) from spec's information
impl TryFrom<&oci::Spec> for InitialSize {
// generate initial resource(vcpu and memory in MiB) from annotations
impl TryFrom<&HashMap<String, String>> for InitialSize {
type Error = anyhow::Error;
fn try_from(spec: &oci::Spec) -> Result<Self> {
fn try_from(an: &HashMap<String, String>) -> Result<Self> {
let mut vcpu: u32 = 0;
let mut mem_mb: u32 = 0;
match container_type(spec) {
// podsandbox, from annotation
ContainerType::PodSandbox => {
let spec_annos = spec.annotations().clone().unwrap_or_default();
let annotation = Annotation::new(spec_annos);
let annotation = Annotation::new(an.clone());
let (period, quota, memory) =
get_sizing_info(annotation).context("failed to get sizing info")?;
let mut cpu = oci::LinuxCpu::default();
@ -44,7 +40,27 @@ impl TryFrom<&oci::Spec> for InitialSize {
if let Ok(cpu_resource) = LinuxContainerCpuResources::try_from(&cpu) {
vcpu = get_nr_vcpu(&cpu_resource);
}
mem_mb = convert_memory_to_mb(memory);
let mem_mb = convert_memory_to_mb(memory);
Ok(Self {
vcpu,
mem_mb,
orig_toml_default_mem: 0,
})
}
}
// generate initial resource(vcpu and memory in MiB) from spec's information
impl TryFrom<&oci::Spec> for InitialSize {
type Error = anyhow::Error;
fn try_from(spec: &oci::Spec) -> Result<Self> {
let mut vcpu: u32 = 0;
let mut mem_mb: u32 = 0;
match container_type(spec) {
// podsandbox, from annotation
ContainerType::PodSandbox => {
let spec_annos = spec.annotations().clone().unwrap_or_default();
return InitialSize::try_from(&spec_annos);
}
// single container, from container spec
_ => {
@ -107,6 +123,13 @@ impl InitialSizeManager {
})
}
pub fn new_from(annotation: &HashMap<String, String>) -> Result<Self> {
Ok(Self {
resource: InitialSize::try_from(annotation)
.context("failed to construct static resource")?,
})
}
pub fn setup_config(&mut self, config: &mut TomlConfig) -> Result<()> {
// update this data to the hypervisor config for later use by hypervisor
let hypervisor_name = &config.runtime.hypervisor_name;

View File

@ -6,7 +6,7 @@
use std::sync::Arc;
use crate::{message::Message, ContainerManager, Sandbox};
use crate::{message::Message, types::SandboxConfig, ContainerManager, Sandbox};
use anyhow::Result;
use async_trait::async_trait;
use kata_types::config::TomlConfig;
@ -39,6 +39,7 @@ pub trait RuntimeHandler: Send + Sync {
msg_sender: Sender<Message>,
config: Arc<TomlConfig>,
init_size_manager: InitialSizeManager,
sandbox_config: SandboxConfig,
) -> Result<RuntimeInstance>;
fn cleanup(&self, id: &str) -> Result<()>;

View File

@ -7,8 +7,6 @@
use crate::{types::ContainerProcess, ContainerManager};
use anyhow::Result;
use async_trait::async_trait;
use oci_spec::runtime as oci;
use runtime_spec as spec;
use std::sync::Arc;
#[derive(Clone)]
@ -28,13 +26,7 @@ impl std::fmt::Debug for SandboxNetworkEnv {
#[async_trait]
pub trait Sandbox: Send + Sync {
async fn start(
&self,
dns: Vec<String>,
spec: &oci::Spec,
state: &spec::State,
network_env: SandboxNetworkEnv,
) -> Result<()>;
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn cleanup(&self) -> Result<()>;
async fn shutdown(&self) -> Result<()>;

View File

@ -10,11 +10,17 @@ mod trans_into_agent;
mod trans_into_shim;
pub mod utils;
use std::fmt;
use std::{
collections::{hash_map::RandomState, HashMap},
fmt,
};
use crate::SandboxNetworkEnv;
use anyhow::{Context, Result};
use kata_sys_util::validate;
use kata_types::mount::Mount;
use oci_spec::runtime as oci;
use strum::Display;
/// TaskRequest: TaskRequest from shim
@ -135,6 +141,17 @@ pub struct ContainerConfig {
pub stderr: Option<String>,
}
#[derive(Clone, Debug)]
pub struct SandboxConfig {
pub sandbox_id: String,
pub hostname: String,
pub dns: Vec<String>,
pub network_env: SandboxNetworkEnv,
pub annotations: HashMap<String, String, RandomState>,
pub hooks: Option<oci::Hooks>,
pub state: runtime_spec::State,
}
#[derive(Debug, Clone)]
pub struct PID {
pub pid: u32,

View File

@ -7,7 +7,7 @@ use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use common::{message::Message, types::SandboxConfig, RuntimeHandler, RuntimeInstance};
use kata_types::config::TomlConfig;
use resource::cpu_mem::initial_size::InitialSizeManager;
use tokio::sync::mpsc::Sender;
@ -34,6 +34,7 @@ impl RuntimeHandler for LinuxContainer {
_msg_sender: Sender<Message>,
_config: Arc<TomlConfig>,
_init_size_manager: InitialSizeManager,
_sandbox_config: SandboxConfig,
) -> Result<RuntimeInstance> {
todo!()
}

View File

@ -7,7 +7,7 @@
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
types::{ContainerProcess, TaskRequest, TaskResponse},
types::{ContainerProcess, SandboxConfig, TaskRequest, TaskResponse},
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
};
use hypervisor::Param;
@ -92,10 +92,7 @@ impl RuntimeHandlerManagerInner {
#[instrument]
async fn init_runtime_handler(
&mut self,
spec: &oci::Spec,
state: &spec::State,
network_env: SandboxNetworkEnv,
dns: Vec<String>,
sandbox_config: SandboxConfig,
config: Arc<TomlConfig>,
init_size_manager: InitialSizeManager,
) -> Result<()> {
@ -117,6 +114,7 @@ impl RuntimeHandlerManagerInner {
self.msg_sender.clone(),
config.clone(),
init_size_manager,
sandbox_config,
)
.await
.context("new runtime instance")?;
@ -137,21 +135,22 @@ impl RuntimeHandlerManagerInner {
let instance = Arc::new(runtime_instance);
self.runtime_instance = Some(instance.clone());
// start sandbox
instance
.sandbox
.start(dns, spec, state, network_env)
.await
.context("start sandbox")?;
Ok(())
}
#[instrument]
async fn start_runtime_handler(&self) -> Result<()> {
if let Some(instance) = self.runtime_instance.as_ref() {
instance.sandbox.start().await.context("start sandbox")?;
}
Ok(())
}
#[instrument]
async fn try_init(
&mut self,
spec: &oci::Spec,
state: &spec::State,
mut sandbox_config: SandboxConfig,
spec: Option<&oci::Spec>,
options: &Option<Vec<u8>>,
) -> Result<()> {
// return if runtime instance has init
@ -159,8 +158,6 @@ impl RuntimeHandlerManagerInner {
return Ok(());
}
let mut dns: Vec<String> = vec![];
#[cfg(feature = "linux")]
LinuxContainer::init().context("init linux container")?;
#[cfg(feature = "wasm")]
@ -168,15 +165,8 @@ impl RuntimeHandlerManagerInner {
#[cfg(feature = "virt")]
VirtContainer::init().context("init virt container")?;
let spec_mounts = spec.mounts().clone().unwrap_or_default();
for m in &spec_mounts {
if get_mount_path(&Some(m.destination().clone())) == DEFAULT_GUEST_DNS_FILE {
let contents = fs::read_to_string(&Path::new(&get_mount_path(m.source()))).await?;
dns = contents.split('\n').map(|e| e.to_string()).collect();
}
}
let mut config = load_config(spec, options).context("load config")?;
let mut config =
load_config(&sandbox_config.annotations, options).context("load config")?;
// Sandbox sizing information *may* be provided in two scenarios:
// 1. The upper layer runtime (ie, containerd or crio) provide sandbox sizing information as an annotation
@ -186,8 +176,14 @@ impl RuntimeHandlerManagerInner {
// 2. If this is not a sandbox infrastructure container, but instead a standalone single container (analogous to "docker run..."),
// then the container spec itself will contain appropriate sizing information for the entire sandbox (since it is
// a single container.
let mut initial_size_manager =
InitialSizeManager::new(spec).context("failed to construct static resource manager")?;
let mut initial_size_manager = if let Some(spec) = spec {
InitialSizeManager::new(spec).context("failed to construct static resource manager")?
} else {
InitialSizeManager::new_from(&sandbox_config.annotations)
.context("failed to construct static resource manager")?
};
initial_size_manager
.setup_config(&mut config)
.context("failed to setup static resource mgmt config")?;
@ -195,54 +191,19 @@ impl RuntimeHandlerManagerInner {
update_component_log_level(&config);
let dan_path = dan_config_path(&config, &self.id);
let mut network_created = false;
// set netns to None if we want no network for the VM
let netns = if config.runtime.disable_new_netns {
None
} else if dan_path.exists() {
info!(sl!(), "Do not create a netns due to DAN");
None
} else {
let mut netns_path = None;
if let Some(linux) = &spec.linux() {
let linux_namespaces = linux.namespaces().clone().unwrap_or_default();
for ns in &linux_namespaces {
if ns.typ() != oci::LinuxNamespaceType::Network {
continue;
if config.runtime.disable_new_netns || dan_path.exists() {
sandbox_config.network_env.netns = None;
}
// get netns path from oci spec
if ns.path().is_some() {
netns_path = ns.path().clone().map(|p| p.display().to_string());
}
// if we get empty netns from oci spec, we need to create netns for the VM
else {
let ns_name = generate_netns_name();
let netns = NetNs::new(ns_name)?;
let path = Some(PathBuf::from(netns.path()).display().to_string());
netns_path = path;
network_created = true;
}
break;
}
}
netns_path
};
let network_env = SandboxNetworkEnv {
netns,
network_created,
};
self.init_runtime_handler(
spec,
state,
network_env,
dns,
Arc::new(config),
initial_size_manager,
)
self.init_runtime_handler(sandbox_config, Arc::new(config), initial_size_manager)
.await
.context("init runtime handler")?;
self.start_runtime_handler()
.await
.context("start runtime handler")?;
// the sandbox creation can reach here only once and the sandbox is created
// so we can safely create the shim management socket right now
// the unwrap here is safe because the runtime handler is correctly created
@ -294,7 +255,8 @@ impl RuntimeHandlerManager {
.context("failed to load the sandbox state")?;
let config = if let Ok(spec) = load_oci_spec() {
load_config(&spec, &None).context("load config")?
let annotations = spec.annotations().clone().unwrap_or_default();
load_config(&annotations, &None).context("load config")?
} else {
TomlConfig::default()
};
@ -350,14 +312,76 @@ impl RuntimeHandlerManager {
}
#[instrument]
async fn try_init_runtime_instance(
async fn task_init_runtime_instance(
&self,
spec: &oci::Spec,
state: &spec::State,
options: &Option<Vec<u8>>,
) -> Result<()> {
let mut dns: Vec<String> = vec![];
let spec_mounts = spec.mounts().clone().unwrap_or_default();
for m in &spec_mounts {
if get_mount_path(&Some(m.destination().clone())) == DEFAULT_GUEST_DNS_FILE {
let contents = fs::read_to_string(&Path::new(&get_mount_path(m.source()))).await?;
dns = contents.split('\n').map(|e| e.to_string()).collect();
}
}
let mut network_created = false;
let mut netns = None;
if let Some(linux) = &spec.linux() {
let linux_namespaces = linux.namespaces().clone().unwrap_or_default();
for ns in &linux_namespaces {
if ns.typ() != oci::LinuxNamespaceType::Network {
continue;
}
// get netns path from oci spec
if ns.path().is_some() {
netns = ns.path().clone().map(|p| p.display().to_string());
}
// if we get empty netns from oci spec, we need to create netns for the VM
else {
let ns_name = generate_netns_name();
let raw_netns = NetNs::new(ns_name)?;
let path = Some(PathBuf::from(raw_netns.path()).display().to_string());
netns = path;
network_created = true;
}
break;
}
}
let network_env = SandboxNetworkEnv {
netns,
network_created,
};
let mut inner: tokio::sync::RwLockWriteGuard<'_, RuntimeHandlerManagerInner> =
self.inner.write().await;
let sandbox_config = SandboxConfig {
sandbox_id: inner.id.clone(),
dns,
hostname: spec.hostname().clone().unwrap_or_default(),
network_env,
annotations: spec.annotations().clone().unwrap_or_default(),
hooks: spec.hooks().clone(),
state: state.clone(),
};
inner.try_init(sandbox_config, Some(spec), options).await
}
#[instrument]
async fn init_runtime_instance(
&self,
sandbox_config: SandboxConfig,
spec: Option<&oci::Spec>,
options: &Option<Vec<u8>>,
) -> Result<()> {
let mut inner = self.inner.write().await;
inner.try_init(spec, state, options).await
inner.try_init(sandbox_config, spec, options).await
}
#[instrument(parent = &*(ROOTSPAN))]
@ -379,7 +403,7 @@ impl RuntimeHandlerManager {
annotations: spec.annotations().clone().unwrap_or_default(),
};
self.try_init_runtime_instance(&spec, &state, &container_config.options)
self.task_init_runtime_instance(&spec, &state, &container_config.options)
.await
.context("try init runtime instance")?;
let instance = self
@ -528,9 +552,10 @@ impl RuntimeHandlerManager {
/// 4. If above three are not set, then get default path from DEFAULT_RUNTIME_CONFIGURATIONS
/// in kata-containers/src/libs/kata-types/src/config/default.rs, in array order.
#[instrument]
fn load_config(spec: &oci::Spec, option: &Option<Vec<u8>>) -> Result<TomlConfig> {
fn load_config(an: &HashMap<String, String>, option: &Option<Vec<u8>>) -> Result<TomlConfig> {
const KATA_CONF_FILE: &str = "KATA_CONF_FILE";
let annotation = Annotation::new(spec.annotations().clone().unwrap_or_default());
let annotation = Annotation::new(an.clone());
let config_path = if let Some(path) = annotation.get_sandbox_config_path() {
path
} else if let Ok(path) = std::env::var(KATA_CONF_FILE) {

View File

@ -19,7 +19,7 @@ use std::sync::Arc;
use agent::{kata::KataAgent, AGENT_KATA};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use common::{message::Message, types::SandboxConfig, RuntimeHandler, RuntimeInstance};
use hypervisor::Hypervisor;
#[cfg(not(target_arch = "s390x"))]
use hypervisor::{dragonball::Dragonball, HYPERVISOR_DRAGONBALL};
@ -91,6 +91,7 @@ impl RuntimeHandler for VirtContainer {
msg_sender: Sender<Message>,
config: Arc<TomlConfig>,
init_size_manager: InitialSizeManager,
sandbox_config: SandboxConfig,
) -> Result<RuntimeInstance> {
let hypervisor = new_hypervisor(&config).await.context("new hypervisor")?;
@ -114,6 +115,7 @@ impl RuntimeHandler for VirtContainer {
agent.clone(),
hypervisor.clone(),
resource_manager.clone(),
sandbox_config,
)
.await
.context("new virt sandbox")?;

View File

@ -14,7 +14,7 @@ use async_trait::async_trait;
use common::message::{Action, Message};
use common::types::utils::option_system_time_into;
use common::types::ContainerProcess;
use common::{ContainerManager, Sandbox, SandboxNetworkEnv};
use common::{types::SandboxConfig, ContainerManager, Sandbox, SandboxNetworkEnv};
use containerd_shim_protos::events::task::{TaskExit, TaskOOM};
use hypervisor::VsockConfig;
#[cfg(not(target_arch = "s390x"))]
@ -76,6 +76,7 @@ pub struct VirtSandbox {
agent: Arc<dyn Agent>,
hypervisor: Arc<dyn Hypervisor>,
monitor: Arc<HealthCheck>,
sandbox_config: Option<SandboxConfig>,
}
impl std::fmt::Debug for VirtSandbox {
@ -94,6 +95,7 @@ impl VirtSandbox {
agent: Arc<dyn Agent>,
hypervisor: Arc<dyn Hypervisor>,
resource_manager: Arc<ResourceManager>,
sandbox_config: SandboxConfig,
) -> Result<Self> {
let config = resource_manager.config().await;
let keep_abnormal = config.runtime.keep_abnormal;
@ -105,6 +107,7 @@ impl VirtSandbox {
hypervisor,
resource_manager,
monitor: Arc::new(HealthCheck::new(true, keep_abnormal)),
sandbox_config: Some(sandbox_config),
})
}
@ -293,8 +296,8 @@ impl VirtSandbox {
fn has_prestart_hooks(
&self,
prestart_hooks: Vec<oci::Hook>,
create_runtime_hooks: Vec<oci::Hook>,
prestart_hooks: &[oci::Hook],
create_runtime_hooks: &[oci::Hook],
) -> bool {
!prestart_hooks.is_empty() || !create_runtime_hooks.is_empty()
}
@ -303,32 +306,32 @@ impl VirtSandbox {
#[async_trait]
impl Sandbox for VirtSandbox {
#[instrument(name = "sb: start")]
async fn start(
&self,
dns: Vec<String>,
spec: &oci::Spec,
state: &spec::State,
network_env: SandboxNetworkEnv,
) -> Result<()> {
async fn start(&self) -> Result<()> {
let id = &self.sid;
// if sandbox running, return
// if sandbox not running try to start sandbox
if self.sandbox_config.is_none() {
return Err(anyhow!("sandbox config is missing"));
}
let sandbox_config = self.sandbox_config.as_ref().unwrap();
// if sandbox is not in SandboxState::Init then return,
// otherwise try to create sandbox
let mut inner = self.inner.write().await;
if inner.state == SandboxState::Running {
warn!(sl!(), "sandbox is running, no need to start");
if inner.state != SandboxState::Init {
warn!(sl!(), "sandbox is started");
return Ok(());
}
self.hypervisor
.prepare_vm(id, network_env.netns.clone())
.prepare_vm(id, sandbox_config.network_env.netns.clone())
.await
.context("prepare vm")?;
// generate device and setup before start vm
// should after hypervisor.prepare_vm
let resources = self
.prepare_for_start_sandbox(id, network_env.clone())
.prepare_for_start_sandbox(id, sandbox_config.network_env.clone())
.await?;
self.resource_manager
@ -341,7 +344,8 @@ impl Sandbox for VirtSandbox {
info!(sl!(), "start vm");
// execute pre-start hook functions, including Prestart Hooks and CreateRuntime Hooks
let (prestart_hooks, create_runtime_hooks) = if let Some(hooks) = spec.hooks().as_ref() {
let (prestart_hooks, create_runtime_hooks) =
if let Some(hooks) = sandbox_config.hooks.as_ref() {
(
hooks.prestart().clone().unwrap_or_default(),
hooks.create_runtime().clone().unwrap_or_default(),
@ -350,7 +354,11 @@ impl Sandbox for VirtSandbox {
(Vec::new(), Vec::new())
};
self.execute_oci_hook_functions(&prestart_hooks, &create_runtime_hooks, state)
self.execute_oci_hook_functions(
&prestart_hooks,
&create_runtime_hooks,
&sandbox_config.state,
)
.await?;
// 1. if there are pre-start hook functions, network config might have been changed.
@ -358,11 +366,11 @@ impl Sandbox for VirtSandbox {
// 2. Do not scan the netns if we want no network for the VM.
// TODO In case of vm factory, scan the netns to hotplug interfaces after the VM is started.
let config = self.resource_manager.config().await;
if self.has_prestart_hooks(prestart_hooks, create_runtime_hooks)
if self.has_prestart_hooks(&prestart_hooks, &create_runtime_hooks)
&& !config.runtime.disable_new_netns
&& !dan_config_path(&config, &self.sid).exists()
{
if let Some(netns_path) = network_env.netns {
if let Some(netns_path) = &sandbox_config.network_env.netns {
let network_resource = NetworkConfig::NetNs(NetworkWithNetNsConfig {
network_model: config.runtime.internetworking_model.clone(),
netns_path: netns_path.to_owned(),
@ -372,7 +380,7 @@ impl Sandbox for VirtSandbox {
.await
.network_info
.network_queues as usize,
network_created: network_env.network_created,
network_created: sandbox_config.network_env.network_created,
});
self.resource_manager
.handle_network(network_resource)
@ -402,8 +410,8 @@ impl Sandbox for VirtSandbox {
let agent_config = self.agent.agent_config().await;
let kernel_modules = KernelModule::set_kernel_modules(agent_config.kernel_modules)?;
let req = agent::CreateSandboxRequest {
hostname: spec.hostname().clone().unwrap_or_default(),
dns,
hostname: sandbox_config.hostname.clone(),
dns: sandbox_config.dns.clone(),
storages: self
.resource_manager
.get_storage_for_sandbox()
@ -700,6 +708,7 @@ impl Persist for VirtSandbox {
hypervisor,
resource_manager,
monitor: Arc::new(HealthCheck::new(true, keep_abnormal)),
sandbox_config: None,
})
}
}

View File

@ -7,7 +7,7 @@ use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use common::{message::Message, types::SandboxConfig, RuntimeHandler, RuntimeInstance};
use kata_types::config::TomlConfig;
use resource::cpu_mem::initial_size::InitialSizeManager;
use tokio::sync::mpsc::Sender;
@ -33,6 +33,7 @@ impl RuntimeHandler for WasmContainer {
_msg_sender: Sender<Message>,
_config: Arc<TomlConfig>,
_init_size_manager: InitialSizeManager,
_sandbox_config: SandboxConfig,
) -> Result<RuntimeInstance> {
todo!()
}