From 8976fa8c5d1f2ddd614d5f8dae78f19e80bc422e Mon Sep 17 00:00:00 2001 From: Fupan Li Date: Wed, 18 Sep 2024 14:19:20 +0800 Subject: [PATCH] sandbox: Add the sandbox api support For Kata-Containers, we add SandboxService for these new calls alongside the existing TaskService, including processing requests and replies, and properly calling VirtSandbox's interfaces. By splitting the start logic of the sandbox, virt_container is compatible with calls from the SandboxService and TaskService. In addition, we modify the processing of resource configuration to solve the problem that SandboxService does not have a spec file when creating a pod. Sandbox api can be supported from containerd 1.7. But there's a difference from container 2.0. To enbale it from 2.0, you can support the sandbox api for a specific runtime by adding: sandboxer = "shim", take kata runtime as an example: [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.kata] runtime_type = "io.containerd.kata.v2" sandboxer = "shim" privileged_without_host_devices = true pod_annotations = ["io.katacontainers.*"] For container version 1.7, you can enable it by: 1: add env ENABLE_CRI_SANDBOXES=true 2: add sandbox_mode = "shim" to runtime config. Acknowledgement This work was based on @wllenyj's POC code: (https://github.com/wllenyj/kata-containers/commit/f5b62a2d7c728d1b260afb10d9df144640d27a01) Signed-off-by: Fupan Li Signed-off-by: wllenyj --- src/libs/shim-interface/src/lib.rs | 15 +- .../crates/resource/src/cgroups/mod.rs | 29 ++-- .../crates/resource/src/manager_inner.rs | 6 +- .../crates/runtimes/common/Cargo.toml | 1 + .../crates/runtimes/common/src/error.rs | 6 +- .../crates/runtimes/common/src/lib.rs | 2 +- .../crates/runtimes/common/src/sandbox.rs | 15 +- .../crates/runtimes/common/src/types/mod.rs | 70 ++++++++++ .../common/src/types/trans_from_shim.rs | 121 +++++++++++++++- .../common/src/types/trans_into_shim.rs | 131 +++++++++++++++++- src/runtime-rs/crates/runtimes/src/manager.rs | 123 ++++++++++++---- .../crates/runtimes/virt_container/Cargo.toml | 1 + .../runtimes/virt_container/src/sandbox.rs | 30 +++- src/runtime-rs/crates/service/Cargo.toml | 2 +- src/runtime-rs/crates/service/src/lib.rs | 1 + src/runtime-rs/crates/service/src/manager.rs | 20 ++- .../crates/service/src/sandbox_service.rs | 76 ++++++++++ .../crates/service/src/task_service.rs | 2 +- src/runtime-rs/crates/shim-ctl/src/main.rs | 2 +- src/runtime-rs/crates/shim/src/shim_start.rs | 9 +- 20 files changed, 591 insertions(+), 71 deletions(-) create mode 100644 src/runtime-rs/crates/service/src/sandbox_service.rs diff --git a/src/libs/shim-interface/src/lib.rs b/src/libs/shim-interface/src/lib.rs index 8a401c1aec..46ebe21027 100644 --- a/src/libs/shim-interface/src/lib.rs +++ b/src/libs/shim-interface/src/lib.rs @@ -37,8 +37,10 @@ fn get_uds_with_sid(short_id: &str, path: &str) -> Result { return Ok(format!("unix://{}", p.display())); } - let _ = fs::create_dir_all(kata_run_path.join(short_id)) - .context(format!("failed to create directory {:?}", kata_run_path.join(short_id))); + let _ = fs::create_dir_all(kata_run_path.join(short_id)).context(format!( + "failed to create directory {:?}", + kata_run_path.join(short_id) + )); let target_ids: Vec = fs::read_dir(&kata_run_path)? .filter_map(|e| { @@ -71,8 +73,11 @@ fn get_uds_with_sid(short_id: &str, path: &str) -> Result { } // return sandbox's storage path -pub fn sb_storage_path() -> String { - String::from(KATA_PATH) +pub fn sb_storage_path() -> Result<&'static str> { + //make sure the path existed + std::fs::create_dir_all(KATA_PATH).context(format!("failed to create dir: {}", KATA_PATH))?; + + Ok(KATA_PATH) } // returns the address of the unix domain socket(UDS) for communication with shim @@ -85,7 +90,7 @@ pub fn mgmt_socket_addr(sid: &str) -> Result { )); } - get_uds_with_sid(sid, &sb_storage_path()) + get_uds_with_sid(sid, sb_storage_path()?) } #[cfg(test)] diff --git a/src/runtime-rs/crates/resource/src/cgroups/mod.rs b/src/runtime-rs/crates/resource/src/cgroups/mod.rs index 96548f4f34..9aa07b70f0 100644 --- a/src/runtime-rs/crates/resource/src/cgroups/mod.rs +++ b/src/runtime-rs/crates/resource/src/cgroups/mod.rs @@ -29,6 +29,7 @@ use tokio::sync::RwLock; use crate::ResourceUpdateOp; const OS_ERROR_NO_SUCH_PROCESS: i32 = 3; +const SANDBOXED_CGROUP_PATH: &str = "kata_sandboxed_pod"; pub struct CgroupArgs { pub sid: String, @@ -44,19 +45,21 @@ pub struct CgroupConfig { impl CgroupConfig { fn new(sid: &str, toml_config: &TomlConfig) -> Result { let overhead_path = utils::gen_overhead_path(sid); - let spec = load_oci_spec()?; - let path = spec - .linux() - .clone() - .and_then(|linux| linux.cgroups_path().clone()) - .map(|path| { - // The trim of '/' is important, because cgroup_path is a relative path. - path.display() - .to_string() - .trim_start_matches('/') - .to_string() - }) - .unwrap_or_default(); + let path = if let Ok(spec) = load_oci_spec() { + spec.linux() + .clone() + .and_then(|linux| linux.cgroups_path().clone()) + .map(|path| { + // The trim of '/' is important, because cgroup_path is a relative path. + path.display() + .to_string() + .trim_start_matches('/') + .to_string() + }) + .unwrap_or_default() + } else { + format!("{}/{}", SANDBOXED_CGROUP_PATH, sid) + }; Ok(Self { path, diff --git a/src/runtime-rs/crates/resource/src/manager_inner.rs b/src/runtime-rs/crates/resource/src/manager_inner.rs index d2f2d24a3f..7a845c5814 100644 --- a/src/runtime-rs/crates/resource/src/manager_inner.rs +++ b/src/runtime-rs/crates/resource/src/manager_inner.rs @@ -67,9 +67,9 @@ impl ResourceManagerInner { .await .context("failed to create device manager")?; - let cgroups_resource = CgroupsResource::new(sid, &toml_config)?; - let cpu_resource = CpuResource::new(toml_config.clone())?; - let mem_resource = MemResource::new(init_size_manager)?; + let cgroups_resource = CgroupsResource::new(sid, &toml_config).context("load cgroup")?; + let cpu_resource = CpuResource::new(toml_config.clone()).context("load cpu resource")?; + let mem_resource = MemResource::new(init_size_manager).context("load memory resources")?; Ok(Self { sid: sid.to_string(), toml_config, diff --git a/src/runtime-rs/crates/runtimes/common/Cargo.toml b/src/runtime-rs/crates/runtimes/common/Cargo.toml index 49f2898b28..41c1a363c9 100644 --- a/src/runtime-rs/crates/runtimes/common/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/common/Cargo.toml @@ -28,3 +28,4 @@ kata-types = { path = "../../../../libs/kata-types" } runtime-spec = { path = "../../../../libs/runtime-spec" } oci-spec = { version = "0.6.8", features = ["runtime"] } resource = { path = "../../resource" } +protocols = { path = "../../../../libs/protocols"} diff --git a/src/runtime-rs/crates/runtimes/common/src/error.rs b/src/runtime-rs/crates/runtimes/common/src/error.rs index a64ff64773..e9f8e19969 100644 --- a/src/runtime-rs/crates/runtimes/common/src/error.rs +++ b/src/runtime-rs/crates/runtimes/common/src/error.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::types::{ContainerProcess, TaskResponse}; +use crate::types::{ContainerProcess, SandboxResponse, TaskResponse}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -12,6 +12,8 @@ pub enum Error { ContainerNotFound(String), #[error("failed to find process {0}")] ProcessNotFound(ContainerProcess), - #[error("unexpected response {0} to shim {1}")] + #[error("unexpected task response {0} to shim {1}")] UnexpectedResponse(TaskResponse, String), + #[error("unexpected sandbox response {0} to shim {1}")] + UnexpectedSandboxResponse(SandboxResponse, String), } diff --git a/src/runtime-rs/crates/runtimes/common/src/lib.rs b/src/runtime-rs/crates/runtimes/common/src/lib.rs index adb5ca0028..b6a3a0c09b 100644 --- a/src/runtime-rs/crates/runtimes/common/src/lib.rs +++ b/src/runtime-rs/crates/runtimes/common/src/lib.rs @@ -11,5 +11,5 @@ pub mod message; mod runtime_handler; pub use runtime_handler::{RuntimeHandler, RuntimeInstance}; mod sandbox; -pub use sandbox::{Sandbox, SandboxNetworkEnv}; +pub use sandbox::{Sandbox, SandboxNetworkEnv, SandboxStatus}; pub mod types; diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs index 72e972ca52..85c6de1530 100644 --- a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -4,7 +4,10 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::{types::ContainerProcess, ContainerManager}; +use crate::{ + types::{ContainerProcess, SandboxExitInfo}, + ContainerManager, +}; use anyhow::Result; use async_trait::async_trait; use std::sync::Arc; @@ -15,6 +18,14 @@ pub struct SandboxNetworkEnv { pub network_created: bool, } +#[derive(Default, Clone, Debug)] +pub struct SandboxStatus { + pub sandbox_id: String, + pub pid: u32, + pub state: String, + pub info: std::collections::HashMap, +} + impl std::fmt::Debug for SandboxNetworkEnv { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SandboxNetworkEnv") @@ -27,6 +38,8 @@ impl std::fmt::Debug for SandboxNetworkEnv { #[async_trait] pub trait Sandbox: Send + Sync { async fn start(&self) -> Result<()>; + async fn status(&self) -> Result; + async fn wait(&self) -> Result; async fn stop(&self) -> Result<()>; async fn cleanup(&self) -> Result<()>; async fn shutdown(&self) -> Result<()>; diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index d7d81dcde6..2d36d1dad6 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -141,6 +141,32 @@ pub struct ContainerConfig { pub stderr: Option, } +#[derive(Debug, Clone, Display)] +pub enum SandboxRequest { + CreateSandbox(Box), + StartSandbox(SandboxID), + Platform(SandboxID), + StopSandbox(StopSandboxRequest), + WaitSandbox(SandboxID), + SandboxStatus(SandboxStatusRequest), + Ping(SandboxID), + ShutdownSandbox(SandboxID), +} + +/// Response: sandbox response to shim +/// Request and Response messages need to be paired +#[derive(Debug, Clone, Display)] +pub enum SandboxResponse { + CreateSandbox, + StartSandbox(StartSandboxInfo), + Platform(PlatformInfo), + StopSandbox, + WaitSandbox(SandboxExitInfo), + SandboxStatus(SandboxStatusInfo), + Ping, + ShutdownSandbox, +} + #[derive(Clone, Debug)] pub struct SandboxConfig { pub sandbox_id: String, @@ -152,6 +178,50 @@ pub struct SandboxConfig { pub state: runtime_spec::State, } +#[derive(Clone, Debug)] +pub struct SandboxID { + pub sandbox_id: String, +} + +#[derive(Clone, Debug)] +pub struct StartSandboxInfo { + pub pid: u32, + pub create_time: Option, +} + +#[derive(Clone, Debug)] +pub struct PlatformInfo { + pub os: String, + pub architecture: String, +} + +#[derive(Clone, Debug)] +pub struct StopSandboxRequest { + pub sandbox_id: String, + pub timeout_secs: u32, +} + +#[derive(Clone, Debug, Default)] +pub struct SandboxExitInfo { + pub exit_status: u32, + pub exited_at: Option, +} + +#[derive(Clone, Debug)] +pub struct SandboxStatusRequest { + pub sandbox_id: String, + pub verbose: bool, +} + +#[derive(Clone, Debug)] +pub struct SandboxStatusInfo { + pub sandbox_id: String, + pub pid: u32, + pub state: String, + pub created_at: Option, + pub exited_at: Option, +} + #[derive(Debug, Clone)] pub struct PID { pub pid: u32, diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs index a35ea96cf3..9582c0ea82 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs @@ -6,16 +6,25 @@ use super::{ ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, - ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest, + ResizePTYRequest, SandboxConfig, SandboxID, SandboxNetworkEnv, SandboxRequest, + SandboxStatusRequest, ShutdownRequest, StopSandboxRequest, TaskRequest, UpdateRequest, }; -use anyhow::{Context, Result}; -use containerd_shim_protos::api; use kata_types::mount::Mount; use std::{ convert::{From, TryFrom}, path::PathBuf, }; +use protobuf::Message; +use runtime_spec; + +use protocols::api as cri_api_v1; + +use anyhow::{anyhow, Context, Result}; +use containerd_shim_protos::{api, sandbox_api}; + +pub const SANDBOX_API_V1: &str = "runtime.v1.PodSandboxConfig"; + fn trans_from_shim_mount(from: &api::Mount) -> Mount { let options = from.options.to_vec(); let mut read_only = false; @@ -37,6 +46,112 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount { } } +// There're a lot of information to create a sandbox from CreateSandboxRequest and the internal PodSandboxConfig. +// At present, we only take out part of it to build SandboxConfig. +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::CreateSandboxRequest) -> Result { + let type_url = from.options.type_url.clone(); + if type_url != SANDBOX_API_V1 { + return Err(anyhow!(format!("unsupported type url: {}", type_url))); + }; + + let config = cri_api_v1::PodSandboxConfig::parse_from_bytes(&from.options.value)?; + + let mut dns: Vec = vec![]; + config.dns_config.map(|mut dns_config| { + dns.append(&mut dns_config.servers); + dns.append(&mut dns_config.servers); + dns.append(&mut dns_config.options); + }); + + Ok(SandboxRequest::CreateSandbox(Box::new(SandboxConfig { + sandbox_id: from.sandbox_id.clone(), + hostname: config.hostname, + dns, + network_env: SandboxNetworkEnv { + netns: Some(from.netns_path), + network_created: false, + }, + annotations: config.annotations.clone(), + hooks: None, + state: runtime_spec::State { + version: Default::default(), + id: from.sandbox_id, + status: runtime_spec::ContainerState::Creating, + pid: 0, + bundle: from.bundle_path, + annotations: config.annotations, + }, + }))) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::StartSandboxRequest) -> Result { + Ok(SandboxRequest::StartSandbox(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::PlatformRequest) -> Result { + Ok(SandboxRequest::Platform(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::StopSandboxRequest) -> Result { + Ok(SandboxRequest::StopSandbox(StopSandboxRequest { + sandbox_id: from.sandbox_id, + timeout_secs: from.timeout_secs, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::WaitSandboxRequest) -> Result { + Ok(SandboxRequest::WaitSandbox(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::SandboxStatusRequest) -> Result { + Ok(SandboxRequest::SandboxStatus(SandboxStatusRequest { + sandbox_id: from.sandbox_id, + verbose: from.verbose, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::PingRequest) -> Result { + Ok(SandboxRequest::Ping(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::ShutdownSandboxRequest) -> Result { + Ok(SandboxRequest::ShutdownSandbox(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::CreateTaskRequest) -> Result { diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs index e146ace7ba..1f23e6e2ee 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -10,10 +10,10 @@ use std::{ }; use anyhow::{anyhow, Result}; -use containerd_shim_protos::api; +use containerd_shim_protos::{api, sandbox_api}; use super::utils::option_system_time_into; -use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse}; +use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, SandboxResponse, TaskResponse}; use crate::error::Error; impl From for api::WaitResponse { @@ -26,6 +26,133 @@ impl From for api::WaitResponse { } } +impl TryFrom for sandbox_api::CreateSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::CreateSandbox => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::StartSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::StartSandbox(resp) => Ok(Self { + pid: resp.pid, + created_at: option_system_time_into(resp.create_time), + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::PlatformResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::Platform(resp) => { + let mut sandbox_resp = Self::new(); + sandbox_resp.mut_platform().set_os(resp.os); + sandbox_resp + .mut_platform() + .set_architecture(resp.architecture); + + Ok(sandbox_resp) + } + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::StopSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::StopSandbox => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::WaitSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::WaitSandbox(resp) => Ok(Self { + exit_status: resp.exit_status, + exited_at: option_system_time_into(resp.exited_at), + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::SandboxStatusResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::SandboxStatus(resp) => Ok(Self { + sandbox_id: resp.sandbox_id, + pid: resp.pid, + state: resp.state, + created_at: option_system_time_into(resp.created_at), + exited_at: option_system_time_into(resp.exited_at), + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::PingResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::Ping => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::ShutdownSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::ShutdownSandbox => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + impl From for api::Status { fn from(from: ProcessStatus) -> Self { match from { diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index 4c4c4766a8..38116815db 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -7,7 +7,10 @@ use anyhow::{anyhow, Context, Result}; use common::{ message::Message, - types::{ContainerProcess, SandboxConfig, TaskRequest, TaskResponse}, + types::{ + ContainerProcess, PlatformInfo, SandboxConfig, SandboxRequest, SandboxResponse, + SandboxStatusInfo, StartSandboxInfo, TaskRequest, TaskResponse, + }, RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv, }; use hypervisor::Param; @@ -29,9 +32,11 @@ use runtime_spec as spec; use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER; use std::{ collections::HashMap, + ops::Deref, path::{Path, PathBuf}, str::from_utf8, sync::Arc, + time::SystemTime, }; use tokio::fs; use tokio::sync::{mpsc::Sender, Mutex, RwLock}; @@ -138,14 +143,6 @@ impl RuntimeHandlerManagerInner { Ok(()) } - #[instrument] - async fn start_runtime_handler(&self) -> Result<()> { - if let Some(instance) = self.runtime_instance.as_ref() { - instance.sandbox.start().await.context("start sandbox")?; - } - Ok(()) - } - #[instrument] async fn try_init( &mut self, @@ -200,10 +197,6 @@ impl RuntimeHandlerManagerInner { .await .context("init runtime handler")?; - self.start_runtime_handler() - .await - .context("start runtime handler")?; - // the sandbox creation can reach here only once and the sandbox is created // so we can safely create the shim management socket right now // the unwrap here is safe because the runtime handler is correctly created @@ -318,6 +311,14 @@ impl RuntimeHandlerManager { state: &spec::State, options: &Option>, ) -> Result<()> { + let mut inner: tokio::sync::RwLockWriteGuard<'_, RuntimeHandlerManagerInner> = + self.inner.write().await; + + // return if runtime instance has init + if inner.runtime_instance.is_some() { + return Ok(()); + } + let mut dns: Vec = vec![]; let spec_mounts = spec.mounts().clone().unwrap_or_default(); @@ -357,9 +358,6 @@ impl RuntimeHandlerManager { network_created, }; - let mut inner: tokio::sync::RwLockWriteGuard<'_, RuntimeHandlerManagerInner> = - self.inner.write().await; - let sandbox_config = SandboxConfig { sandbox_id: inner.id.clone(), dns, @@ -374,18 +372,30 @@ impl RuntimeHandlerManager { } #[instrument] - async fn init_runtime_instance( - &self, - sandbox_config: SandboxConfig, - spec: Option<&oci::Spec>, - options: &Option>, - ) -> Result<()> { + async fn sandbox_init_runtime_instance(&self, sandbox_config: SandboxConfig) -> Result<()> { let mut inner = self.inner.write().await; - inner.try_init(sandbox_config, spec, options).await + inner.try_init(sandbox_config, None, &None).await } #[instrument(parent = &*(ROOTSPAN))] - pub async fn handler_message(&self, req: TaskRequest) -> Result { + pub async fn handler_sandbox_message(&self, req: SandboxRequest) -> Result { + if let SandboxRequest::CreateSandbox(sandbox_config) = req { + let config = sandbox_config.deref().clone(); + + self.sandbox_init_runtime_instance(config) + .await + .context("init sandboxed runtime")?; + + Ok(SandboxResponse::CreateSandbox) + } else { + self.handler_sandbox_request(req) + .await + .context("handler request") + } + } + + #[instrument(parent = &*(ROOTSPAN))] + pub async fn handler_task_message(&self, req: TaskRequest) -> Result { if let TaskRequest::CreateContainer(container_config) = req { // get oci spec let bundler_path = format!( @@ -406,11 +416,17 @@ impl RuntimeHandlerManager { self.task_init_runtime_instance(&spec, &state, &container_config.options) .await .context("try init runtime instance")?; - let instance = self + let instance: Arc = self .get_runtime_instance() .await .context("get runtime instance")?; + instance + .sandbox + .start() + .await + .context("start sandbox in task handler")?; + let container_id = container_config.container_id.clone(); let shim_pid = instance .container_manager @@ -434,14 +450,67 @@ impl RuntimeHandlerManager { Ok(TaskResponse::CreateContainer(shim_pid)) } else { - self.handler_request(req) + self.handler_task_request(req) .await .context("handler TaskRequest") } } + pub async fn handler_sandbox_request(&self, req: SandboxRequest) -> Result { + let instance = self + .get_runtime_instance() + .await + .context("get runtime instance")?; + let sandbox = instance.sandbox.clone(); + + match req { + SandboxRequest::CreateSandbox(req) => Err(anyhow!("Unreachable request {:?}", req)), + SandboxRequest::StartSandbox(_) => { + sandbox + .start() + .await + .context("start sandbox in sandbox handler")?; + Ok(SandboxResponse::StartSandbox(StartSandboxInfo { + pid: std::process::id(), + create_time: Some(SystemTime::now()), + })) + } + SandboxRequest::Platform(_) => Ok(SandboxResponse::Platform(PlatformInfo { + os: std::env::consts::OS.to_string(), + architecture: std::env::consts::ARCH.to_string(), + })), + SandboxRequest::StopSandbox(_) => { + sandbox.stop().await.context("stop sandbox")?; + + Ok(SandboxResponse::StopSandbox) + } + SandboxRequest::WaitSandbox(_) => { + let exit_info = sandbox.wait().await.context("wait sandbox")?; + + Ok(SandboxResponse::WaitSandbox(exit_info)) + } + SandboxRequest::SandboxStatus(_) => { + let status = sandbox.status().await?; + + Ok(SandboxResponse::SandboxStatus(SandboxStatusInfo { + sandbox_id: status.sandbox_id, + pid: status.pid, + state: status.state, + created_at: None, + exited_at: None, + })) + } + SandboxRequest::Ping(_) => Ok(SandboxResponse::Ping), + SandboxRequest::ShutdownSandbox(_) => { + sandbox.shutdown().await.context("shutdown sandbox")?; + + Ok(SandboxResponse::ShutdownSandbox) + } + } + } + #[instrument(parent = &(*ROOTSPAN))] - pub async fn handler_request(&self, req: TaskRequest) -> Result { + pub async fn handler_task_request(&self, req: TaskRequest) -> Result { let instance = self .get_runtime_instance() .await diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml index 0668cebfea..9e3ac19fd0 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -37,6 +37,7 @@ runtime-spec = { path = "../../../../libs/runtime-spec" } oci-spec = { version = "0.6.8", features = ["runtime"] } persist = { path = "../../persist"} resource = { path = "../../resource" } +strum = { version = "0.24.0", features = ["derive"] } [features] default = ["cloud-hypervisor"] diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs index 86491febe6..8b583c8b86 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs @@ -14,7 +14,10 @@ use async_trait::async_trait; use common::message::{Action, Message}; use common::types::utils::option_system_time_into; use common::types::ContainerProcess; -use common::{types::SandboxConfig, ContainerManager, Sandbox, SandboxNetworkEnv}; +use common::{ + types::{SandboxConfig, SandboxExitInfo}, + ContainerManager, Sandbox, SandboxNetworkEnv, SandboxStatus, +}; use containerd_shim_protos::events::task::{TaskExit, TaskOOM}; use hypervisor::VsockConfig; #[cfg(not(target_arch = "s390x"))] @@ -38,6 +41,7 @@ use resource::network::{dan_config_path, DanNetworkConfig, NetworkConfig, Networ use resource::{ResourceConfig, ResourceManager}; use runtime_spec as spec; use std::sync::Arc; +use strum::Display; use tokio::sync::{mpsc::Sender, Mutex, RwLock}; use tracing::instrument; @@ -51,7 +55,7 @@ pub struct SandboxRestoreArgs { pub sender: Sender, } -#[derive(Clone, Copy, PartialEq, Debug)] +#[derive(Clone, Copy, PartialEq, Debug, Display)] pub enum SandboxState { Init, Running, @@ -485,6 +489,28 @@ impl Sandbox for VirtSandbox { Ok(()) } + async fn status(&self) -> Result { + info!(sl!(), "get sandbox status"); + let inner = self.inner.read().await; + let state = inner.state.to_string(); + + Ok(SandboxStatus { + sandbox_id: self.sid.clone(), + pid: std::process::id(), + state, + ..Default::default() + }) + } + + async fn wait(&self) -> Result { + info!(sl!(), "wait sandbox"); + let exit_code = self.hypervisor.wait_vm().await.context("wait vm")?; + Ok(SandboxExitInfo { + exit_status: exit_code as u32, + exited_at: Some(std::time::SystemTime::now()), + }) + } + async fn stop(&self) -> Result<()> { let mut sandbox_inner = self.inner.write().await; diff --git a/src/runtime-rs/crates/service/Cargo.toml b/src/runtime-rs/crates/service/Cargo.toml index 528b2642c6..6f684712ab 100644 --- a/src/runtime-rs/crates/service/Cargo.toml +++ b/src/runtime-rs/crates/service/Cargo.toml @@ -15,7 +15,7 @@ tracing = "0.1.36" ttrpc = "0.8" common = { path = "../runtimes/common" } -containerd-shim-protos = { version = "0.6.0", features = ["async"] } +containerd-shim-protos = { version = "0.6.0", features = ["async", "sandbox"] } containerd-shim = { version = "0.6.0", features = ["async"] } logging = { path = "../../../libs/logging" } kata-types = { path = "../../../libs/kata-types" } diff --git a/src/runtime-rs/crates/service/src/lib.rs b/src/runtime-rs/crates/service/src/lib.rs index 7049760ec8..c72fc78e2d 100644 --- a/src/runtime-rs/crates/service/src/lib.rs +++ b/src/runtime-rs/crates/service/src/lib.rs @@ -11,6 +11,7 @@ logging::logger_with_subsystem!(sl, "service"); mod event; mod manager; +mod sandbox_service; mod task_service; pub use manager::ServiceManager; diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index 8542a0c9e9..ea0b0fbab2 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -17,7 +17,9 @@ use tokio::sync::mpsc::{channel, Receiver}; use ttrpc::asynchronous::Server; use crate::event::{new_event_publisher, Forwarder}; +use crate::sandbox_service::SandboxService; use crate::task_service::TaskService; +use containerd_shim_protos::sandbox_async; /// message buffer size const MESSAGE_BUFFER_SIZE: usize = 8; @@ -136,25 +138,29 @@ impl ServiceManager { } fn registry_service(&mut self) -> Result<()> { - if let Some(t) = self.server.take() { + if let Some(s) = self.server.take() { + let sandbox_service = Arc::new(Box::new(SandboxService::new(self.handler.clone())) + as Box); + let s = s.register_service(sandbox_async::create_sandbox(sandbox_service)); + let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) as Box); - let t = t.register_service(shim_async::create_task(task_service)); - self.server = Some(t); + let s = s.register_service(shim_async::create_task(task_service)); + self.server = Some(s); } Ok(()) } async fn start_service(&mut self) -> Result<()> { - if let Some(t) = self.server.as_mut() { - t.start().await.context("task server start")?; + if let Some(s) = self.server.as_mut() { + s.start().await.context("task server start")?; } Ok(()) } async fn stop_service(&mut self) -> Result<()> { - if let Some(t) = self.server.as_mut() { - t.stop_listen().await; + if let Some(s) = self.server.as_mut() { + s.stop_listen().await; } Ok(()) } diff --git a/src/runtime-rs/crates/service/src/sandbox_service.rs b/src/runtime-rs/crates/service/src/sandbox_service.rs new file mode 100644 index 0000000000..2c30b86b67 --- /dev/null +++ b/src/runtime-rs/crates/service/src/sandbox_service.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2019-2024 Alibaba Cloud +// Copyright (c) 2019-2024 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + convert::{TryFrom, TryInto}, + sync::Arc, +}; + +use async_trait::async_trait; +use common::types::{SandboxRequest, SandboxResponse}; +use containerd_shim_protos::{sandbox_api, sandbox_async}; +use runtimes::RuntimeHandlerManager; +use ttrpc::{self, r#async::TtrpcContext}; + +pub(crate) struct SandboxService { + handler: Arc, +} + +impl SandboxService { + pub(crate) fn new(handler: Arc) -> Self { + Self { handler } + } + + async fn handler_message( + &self, + ctx: &TtrpcContext, + req: TtrpcReq, + ) -> ttrpc::Result + where + SandboxRequest: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, + { + let r = req.try_into().map_err(|err| { + ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) + })?; + let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); + debug!(logger, "====> sandbox service {:?}", &r); + let resp = self + .handler + .handler_sandbox_message(r) + .await + .map_err(|err| { + ttrpc::Error::Others(format!("failed to handle sandbox message {:?}", err)) + })?; + debug!(logger, "<==== sandbox service {:?}", &resp); + resp.try_into() + .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) + } +} + +macro_rules! impl_service { + ($($name: tt | $req: ty | $resp: ty),*) => { + #[async_trait] + impl sandbox_async::Sandbox for SandboxService { + $(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> { + self.handler_message(ctx, req).await + })* + } + }; +} + +impl_service!( + create_sandbox | sandbox_api::CreateSandboxRequest | sandbox_api::CreateSandboxResponse, + start_sandbox | sandbox_api::StartSandboxRequest | sandbox_api::StartSandboxResponse, + platform | sandbox_api::PlatformRequest | sandbox_api::PlatformResponse, + stop_sandbox | sandbox_api::StopSandboxRequest | sandbox_api::StopSandboxResponse, + wait_sandbox | sandbox_api::WaitSandboxRequest | sandbox_api::WaitSandboxResponse, + sandbox_status | sandbox_api::SandboxStatusRequest | sandbox_api::SandboxStatusResponse, + ping_sandbox | sandbox_api::PingRequest | sandbox_api::PingResponse, + shutdown_sandbox | sandbox_api::ShutdownSandboxRequest | sandbox_api::ShutdownSandboxResponse +); diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index 9e47595b01..a6845bb85f 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -42,7 +42,7 @@ impl TaskService { let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); debug!(logger, "====> task service {:?}", &r); let resp = - self.handler.handler_message(r).await.map_err(|err| { + self.handler.handler_task_message(r).await.map_err(|err| { ttrpc::Error::Others(format!("failed to handle message {:?}", err)) })?; debug!(logger, "<==== task service {:?}", &resp); diff --git a/src/runtime-rs/crates/shim-ctl/src/main.rs b/src/runtime-rs/crates/shim-ctl/src/main.rs index 28f1b8a7e5..529f0308f1 100644 --- a/src/runtime-rs/crates/shim-ctl/src/main.rs +++ b/src/runtime-rs/crates/shim-ctl/src/main.rs @@ -29,7 +29,7 @@ async fn real_main() { stderr: None, }); - manager.handler_message(req).await.ok(); + manager.handler_task_message(req).await.ok(); } fn main() -> Result<(), Box> { diff --git a/src/runtime-rs/crates/shim/src/shim_start.rs b/src/runtime-rs/crates/shim/src/shim_start.rs index 5071e3637f..4be820f8c5 100644 --- a/src/runtime-rs/crates/shim/src/shim_start.rs +++ b/src/runtime-rs/crates/shim/src/shim_start.rs @@ -34,8 +34,13 @@ impl ShimExecutor { fn do_start(&mut self) -> Result { let bundle_path = get_bundle_path().context("get bundle path")?; - let spec = self.load_oci_spec(&bundle_path)?; - let (container_type, id) = k8s::container_type_with_id(&spec); + + let mut container_type = ContainerType::PodSandbox; + let mut id = None; + + if let Ok(spec) = self.load_oci_spec(&bundle_path) { + (container_type, id) = k8s::container_type_with_id(&spec); + } match container_type { ContainerType::PodSandbox | ContainerType::SingleContainer => {