From 2d6b1e6b13de07007efa8df30290bfefdf66b29e Mon Sep 17 00:00:00 2001 From: Fupan Li Date: Wed, 5 Feb 2025 17:17:17 +0800 Subject: [PATCH] runtime-rs: add the sandbox api support For Kata-Containers, we add SandboxService for these new calls alongside the existing TaskService, including processing requests and replies, and properly calling VirtSandbox's interfaces. By splitting the start logic of the sandbox, virt_container is compatible with calls from the SandboxService and TaskService. In addition, we modify the processing of resource configuration to solve the problem that SandboxService does not have a spec file when creating a pod. Sandbox api can be supported from containerd 1.7. But there's a difference from container 2.0. To enbale it from 2.0, you can support the sandbox api for a specific runtime by adding: sandboxer = "shim", take kata runtime as an example: [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.kata] runtime_type = "io.containerd.kata.v2" sandboxer = "shim" privileged_without_host_devices = true pod_annotations = ["io.katacontainers.*"] For container version 1.7, you can enable it by: 1: add env ENABLE_CRI_SANDBOXES=true 2: add sandbox_mode = "shim" to runtime config. Acknowledgement This work was based on @wllenyj's POC code: (https://github.com/wllenyj/kata-containers/commit/f5b62a2d7c728d1b260afb10d9df144640d27a01) Signed-off-by: Fupan Li Signed-off-by: wllenyj --- src/runtime-rs/Cargo.lock | 2 + .../crates/runtimes/common/Cargo.toml | 1 + .../crates/runtimes/common/src/error.rs | 6 +- .../crates/runtimes/common/src/sandbox.rs | 1 + .../crates/runtimes/common/src/types/mod.rs | 78 +++++++++++ .../common/src/types/trans_from_shim.rs | 122 +++++++++++++++- .../common/src/types/trans_into_shim.rs | 132 +++++++++++++++++- src/runtime-rs/crates/runtimes/src/manager.rs | 74 +++++++++- .../crates/runtimes/virt_container/Cargo.toml | 2 +- src/runtime-rs/crates/service/Cargo.toml | 2 +- .../crates/service/src/task_service.rs | 2 +- src/runtime-rs/crates/shim-ctl/src/main.rs | 2 +- 12 files changed, 409 insertions(+), 15 deletions(-) diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index 510765aef5..13d49ffdea 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -513,6 +513,7 @@ dependencies = [ "oci-spec", "persist", "protobuf 3.2.0", + "protocols", "resource", "runtime-spec", "serde_json", @@ -4647,6 +4648,7 @@ dependencies = [ "serde_json", "slog", "slog-scope", + "strum 0.24.1", "tokio", "toml 0.4.10", "tracing", diff --git a/src/runtime-rs/crates/runtimes/common/Cargo.toml b/src/runtime-rs/crates/runtimes/common/Cargo.toml index dcc8e46971..086fdae975 100644 --- a/src/runtime-rs/crates/runtimes/common/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/common/Cargo.toml @@ -28,3 +28,4 @@ kata-types = { path = "../../../../libs/kata-types" } runtime-spec = { path = "../../../../libs/runtime-spec" } oci-spec = { version = "0.6.8", features = ["runtime"] } resource = { path = "../../resource" } +protocols = { path = "../../../../libs/protocols"} diff --git a/src/runtime-rs/crates/runtimes/common/src/error.rs b/src/runtime-rs/crates/runtimes/common/src/error.rs index a64ff64773..e9f8e19969 100644 --- a/src/runtime-rs/crates/runtimes/common/src/error.rs +++ b/src/runtime-rs/crates/runtimes/common/src/error.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::types::{ContainerProcess, TaskResponse}; +use crate::types::{ContainerProcess, SandboxResponse, TaskResponse}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -12,6 +12,8 @@ pub enum Error { ContainerNotFound(String), #[error("failed to find process {0}")] ProcessNotFound(ContainerProcess), - #[error("unexpected response {0} to shim {1}")] + #[error("unexpected task response {0} to shim {1}")] UnexpectedResponse(TaskResponse, String), + #[error("unexpected sandbox response {0} to shim {1}")] + UnexpectedSandboxResponse(SandboxResponse, String), } diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs index 72e972ca52..c8a61051ec 100644 --- a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -5,6 +5,7 @@ // use crate::{types::ContainerProcess, ContainerManager}; + use anyhow::Result; use async_trait::async_trait; use std::sync::Arc; diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index 3accf34581..3883204167 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -141,6 +141,32 @@ pub struct ContainerConfig { pub stderr: Option, } +#[derive(Debug, Clone, Display)] +pub enum SandboxRequest { + CreateSandbox(Box), + StartSandbox(SandboxID), + Platform(SandboxID), + StopSandbox(StopSandboxRequest), + WaitSandbox(SandboxID), + SandboxStatus(SandboxStatusRequest), + Ping(SandboxID), + ShutdownSandbox(SandboxID), +} + +/// Response: sandbox response to shim +/// Request and Response messages need to be paired +#[derive(Debug, Clone, Display)] +pub enum SandboxResponse { + CreateSandbox, + StartSandbox(StartSandboxInfo), + Platform(PlatformInfo), + StopSandbox, + WaitSandbox(SandboxExitInfo), + SandboxStatus(SandboxStatusInfo), + Ping, + ShutdownSandbox, +} + #[derive(Clone, Debug)] pub struct SandboxConfig { pub sandbox_id: String, @@ -152,6 +178,58 @@ pub struct SandboxConfig { pub state: runtime_spec::State, } +#[derive(Clone, Debug)] +pub struct SandboxID { + pub sandbox_id: String, +} + +#[derive(Clone, Debug)] +pub struct StartSandboxInfo { + pub pid: u32, + pub create_time: Option, +} + +#[derive(Clone, Debug)] +pub struct PlatformInfo { + pub os: String, + pub architecture: String, +} + +#[derive(Clone, Debug)] +pub struct StopSandboxRequest { + pub sandbox_id: String, + pub timeout_secs: u32, +} + +#[derive(Clone, Debug, Default)] +pub struct SandboxExitInfo { + pub exit_status: u32, + pub exited_at: Option, +} + +#[derive(Clone, Debug)] +pub struct SandboxStatusRequest { + pub sandbox_id: String, + pub verbose: bool, +} + +#[derive(Clone, Debug)] +pub struct SandboxStatusInfo { + pub sandbox_id: String, + pub pid: u32, + pub state: String, + pub created_at: Option, + pub exited_at: Option, +} + +#[derive(Default, Clone, Debug)] +pub struct SandboxStatus { + pub sandbox_id: String, + pub pid: u32, + pub state: String, + pub info: std::collections::HashMap, +} + #[derive(Debug, Clone)] pub struct PID { pub pid: u32, diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs index a35ea96cf3..0ace92610e 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs @@ -6,16 +6,26 @@ use super::{ ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, - ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest, + ResizePTYRequest, SandboxConfig, SandboxID, SandboxNetworkEnv, SandboxRequest, + SandboxStatusRequest, ShutdownRequest, StopSandboxRequest, TaskRequest, UpdateRequest, }; -use anyhow::{Context, Result}; -use containerd_shim_protos::api; + use kata_types::mount::Mount; use std::{ convert::{From, TryFrom}, path::PathBuf, }; +use protobuf::Message; +use runtime_spec; + +use protocols::api as cri_api_v1; + +use anyhow::{anyhow, Context, Result}; +use containerd_shim_protos::{api, sandbox_api}; + +pub const SANDBOX_API_V1: &str = "runtime.v1.PodSandboxConfig"; + fn trans_from_shim_mount(from: &api::Mount) -> Mount { let options = from.options.to_vec(); let mut read_only = false; @@ -37,6 +47,112 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount { } } +// There're a lot of information to create a sandbox from CreateSandboxRequest and the internal PodSandboxConfig. +// At present, we only take out part of it to build SandboxConfig. +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::CreateSandboxRequest) -> Result { + let type_url = from.options.type_url.clone(); + if type_url != SANDBOX_API_V1 { + return Err(anyhow!(format!("unsupported type url: {}", type_url))); + }; + + let config = cri_api_v1::PodSandboxConfig::parse_from_bytes(&from.options.value)?; + + let mut dns: Vec = vec![]; + config.dns_config.map(|mut dns_config| { + dns.append(&mut dns_config.servers); + dns.append(&mut dns_config.servers); + dns.append(&mut dns_config.options); + }); + + Ok(SandboxRequest::CreateSandbox(Box::new(SandboxConfig { + sandbox_id: from.sandbox_id.clone(), + hostname: config.hostname, + dns, + network_env: SandboxNetworkEnv { + netns: Some(from.netns_path), + network_created: false, + }, + annotations: config.annotations.clone(), + hooks: None, + state: runtime_spec::State { + version: Default::default(), + id: from.sandbox_id, + status: runtime_spec::ContainerState::Creating, + pid: 0, + bundle: from.bundle_path, + annotations: config.annotations, + }, + }))) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::StartSandboxRequest) -> Result { + Ok(SandboxRequest::StartSandbox(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::PlatformRequest) -> Result { + Ok(SandboxRequest::Platform(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::StopSandboxRequest) -> Result { + Ok(SandboxRequest::StopSandbox(StopSandboxRequest { + sandbox_id: from.sandbox_id, + timeout_secs: from.timeout_secs, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::WaitSandboxRequest) -> Result { + Ok(SandboxRequest::WaitSandbox(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::SandboxStatusRequest) -> Result { + Ok(SandboxRequest::SandboxStatus(SandboxStatusRequest { + sandbox_id: from.sandbox_id, + verbose: from.verbose, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::PingRequest) -> Result { + Ok(SandboxRequest::Ping(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + +impl TryFrom for SandboxRequest { + type Error = anyhow::Error; + fn try_from(from: sandbox_api::ShutdownSandboxRequest) -> Result { + Ok(SandboxRequest::ShutdownSandbox(SandboxID { + sandbox_id: from.sandbox_id, + })) + } +} + impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::CreateTaskRequest) -> Result { diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs index e146ace7ba..acb958c6f6 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -10,12 +10,139 @@ use std::{ }; use anyhow::{anyhow, Result}; -use containerd_shim_protos::api; +use containerd_shim_protos::{api, sandbox_api}; use super::utils::option_system_time_into; -use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse}; +use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, SandboxResponse, TaskResponse}; use crate::error::Error; +impl TryFrom for sandbox_api::CreateSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::CreateSandbox => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::StartSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::StartSandbox(resp) => Ok(Self { + pid: resp.pid, + created_at: option_system_time_into(resp.create_time), + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::PlatformResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::Platform(resp) => { + let mut sandbox_resp = Self::new(); + sandbox_resp.mut_platform().set_os(resp.os); + sandbox_resp + .mut_platform() + .set_architecture(resp.architecture); + + Ok(sandbox_resp) + } + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::StopSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::StopSandbox => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::WaitSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::WaitSandbox(resp) => Ok(Self { + exit_status: resp.exit_status, + exited_at: option_system_time_into(resp.exited_at), + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::SandboxStatusResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::SandboxStatus(resp) => Ok(Self { + sandbox_id: resp.sandbox_id, + pid: resp.pid, + state: resp.state, + created_at: option_system_time_into(resp.created_at), + exited_at: option_system_time_into(resp.exited_at), + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::PingResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::Ping => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for sandbox_api::ShutdownSandboxResponse { + type Error = anyhow::Error; + fn try_from(from: SandboxResponse) -> Result { + match from { + SandboxResponse::ShutdownSandbox => Ok(Self::new()), + _ => Err(anyhow!(Error::UnexpectedSandboxResponse( + from, + type_name::().to_string() + ))), + } + } +} + impl From for api::WaitResponse { fn from(from: ProcessExitStatus) -> Self { Self { @@ -38,6 +165,7 @@ impl From for api::Status { } } } + impl From for api::StateResponse { fn from(from: ProcessStateInfo) -> Self { Self { diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index 8187798948..b50effecd2 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -7,9 +7,13 @@ use anyhow::{anyhow, Context, Result}; use common::{ message::Message, - types::{ContainerProcess, SandboxConfig, TaskRequest, TaskResponse}, + types::{ + ContainerProcess, PlatformInfo, SandboxConfig, SandboxRequest, SandboxResponse, + StartSandboxInfo, TaskRequest, TaskResponse, + }, RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv, }; + use hypervisor::Param; use kata_sys_util::{mount::get_mount_path, spec::load_oci_spec}; use kata_types::{ @@ -29,9 +33,11 @@ use runtime_spec as spec; use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER; use std::{ collections::HashMap, + ops::Deref, path::{Path, PathBuf}, str::from_utf8, sync::Arc, + time::SystemTime, }; use tokio::fs; use tokio::sync::{mpsc::Sender, Mutex, RwLock}; @@ -374,7 +380,24 @@ impl RuntimeHandlerManager { } #[instrument(parent = &*(ROOTSPAN))] - pub async fn handler_message(&self, req: TaskRequest) -> Result { + pub async fn handler_sandbox_message(&self, req: SandboxRequest) -> Result { + if let SandboxRequest::CreateSandbox(sandbox_config) = req { + let config = sandbox_config.deref().clone(); + + self.sandbox_init_runtime_instance(config) + .await + .context("init sandboxed runtime")?; + + Ok(SandboxResponse::CreateSandbox) + } else { + self.handler_sandbox_request(req) + .await + .context("handler request") + } + } + + #[instrument(parent = &*(ROOTSPAN))] + pub async fn handler_task_message(&self, req: TaskRequest) -> Result { if let TaskRequest::CreateContainer(container_config) = req { // get oci spec let bundler_path = format!( @@ -429,14 +452,57 @@ impl RuntimeHandlerManager { Ok(TaskResponse::CreateContainer(shim_pid)) } else { - self.handler_request(req) + self.handler_task_request(req) .await .context("handler TaskRequest") } } + pub async fn handler_sandbox_request(&self, req: SandboxRequest) -> Result { + let instance = self + .get_runtime_instance() + .await + .context("get runtime instance")?; + let sandbox = instance.sandbox.clone(); + + match req { + SandboxRequest::CreateSandbox(req) => Err(anyhow!("Unreachable request {:?}", req)), + SandboxRequest::StartSandbox(_) => { + sandbox + .start() + .await + .context("start sandbox in sandbox handler")?; + Ok(SandboxResponse::StartSandbox(StartSandboxInfo { + pid: std::process::id(), + create_time: Some(SystemTime::now()), + })) + } + SandboxRequest::Platform(_) => Ok(SandboxResponse::Platform(PlatformInfo { + os: std::env::consts::OS.to_string(), + architecture: std::env::consts::ARCH.to_string(), + })), + SandboxRequest::StopSandbox(_) => { + sandbox.stop().await.context("stop sandbox")?; + + Ok(SandboxResponse::StopSandbox) + } + SandboxRequest::WaitSandbox(_) => { + unimplemented!() + } + SandboxRequest::SandboxStatus(_) => { + unimplemented!() + } + SandboxRequest::Ping(_) => Ok(SandboxResponse::Ping), + SandboxRequest::ShutdownSandbox(_) => { + sandbox.shutdown().await.context("shutdown sandbox")?; + + Ok(SandboxResponse::ShutdownSandbox) + } + } + } + #[instrument(parent = &(*ROOTSPAN))] - pub async fn handler_request(&self, req: TaskRequest) -> Result { + pub async fn handler_task_request(&self, req: TaskRequest) -> Result { let instance = self .get_runtime_instance() .await diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml index 0668cebfea..4e0670bb2c 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -45,4 +45,4 @@ default = ["cloud-hypervisor"] cloud-hypervisor = [] # Enable the build-in VMM Dragtonball -dragonball = [] \ No newline at end of file +dragonball = [] diff --git a/src/runtime-rs/crates/service/Cargo.toml b/src/runtime-rs/crates/service/Cargo.toml index 4a63318014..03caee1740 100644 --- a/src/runtime-rs/crates/service/Cargo.toml +++ b/src/runtime-rs/crates/service/Cargo.toml @@ -15,7 +15,7 @@ tracing = "0.1.36" ttrpc = "0.8.4" common = { path = "../runtimes/common" } -containerd-shim-protos = { version = "0.6.0", features = ["async"] } +containerd-shim-protos = { version = "0.6.0", features = ["async", "sandbox"] } containerd-shim = { version = "0.6.0", features = ["async"] } logging = { path = "../../../libs/logging" } kata-types = { path = "../../../libs/kata-types" } diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index 9e47595b01..a6845bb85f 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -42,7 +42,7 @@ impl TaskService { let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); debug!(logger, "====> task service {:?}", &r); let resp = - self.handler.handler_message(r).await.map_err(|err| { + self.handler.handler_task_message(r).await.map_err(|err| { ttrpc::Error::Others(format!("failed to handle message {:?}", err)) })?; debug!(logger, "<==== task service {:?}", &resp); diff --git a/src/runtime-rs/crates/shim-ctl/src/main.rs b/src/runtime-rs/crates/shim-ctl/src/main.rs index 28f1b8a7e5..529f0308f1 100644 --- a/src/runtime-rs/crates/shim-ctl/src/main.rs +++ b/src/runtime-rs/crates/shim-ctl/src/main.rs @@ -29,7 +29,7 @@ async fn real_main() { stderr: None, }); - manager.handler_message(req).await.ok(); + manager.handler_task_message(req).await.ok(); } fn main() -> Result<(), Box> {