runtime-rs: add the sandbox api support

For Kata-Containers, we add SandboxService for these new calls alongside
the existing
TaskService, including processing requests and replies, and properly
calling
VirtSandbox's interfaces. By splitting the start logic of the sandbox,
virt_container
is compatible with calls from the SandboxService and TaskService. In
addition, we modify
the processing of resource configuration to solve the problem that
SandboxService does not
have a spec file when creating a pod.

Sandbox api can be supported from containerd 1.7. But there's a
difference from container 2.0.
To enbale it from 2.0, you can support the sandbox api for a specific
runtime by adding:
 sandboxer = "shim", take kata runtime as an example:

[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.kata]
          runtime_type = "io.containerd.kata.v2"
          sandboxer = "shim"
          privileged_without_host_devices = true
          pod_annotations = ["io.katacontainers.*"]

For container version 1.7, you can enable it by:

1: add env ENABLE_CRI_SANDBOXES=true
2: add sandbox_mode = "shim" to runtime config.

Acknowledgement

This work was based on @wllenyj's POC code:
(f5b62a2d7c)

Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
Signed-off-by: wllenyj <wllenyj@linux.alibaba.com>
This commit is contained in:
Fupan Li 2025-02-05 17:17:17 +08:00 committed by Pavel Mores
parent 65e908a584
commit 2d6b1e6b13
12 changed files with 409 additions and 15 deletions

View File

@ -513,6 +513,7 @@ dependencies = [
"oci-spec",
"persist",
"protobuf 3.2.0",
"protocols",
"resource",
"runtime-spec",
"serde_json",
@ -4647,6 +4648,7 @@ dependencies = [
"serde_json",
"slog",
"slog-scope",
"strum 0.24.1",
"tokio",
"toml 0.4.10",
"tracing",

View File

@ -28,3 +28,4 @@ kata-types = { path = "../../../../libs/kata-types" }
runtime-spec = { path = "../../../../libs/runtime-spec" }
oci-spec = { version = "0.6.8", features = ["runtime"] }
resource = { path = "../../resource" }
protocols = { path = "../../../../libs/protocols"}

View File

@ -4,7 +4,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::types::{ContainerProcess, TaskResponse};
use crate::types::{ContainerProcess, SandboxResponse, TaskResponse};
#[derive(thiserror::Error, Debug)]
pub enum Error {
@ -12,6 +12,8 @@ pub enum Error {
ContainerNotFound(String),
#[error("failed to find process {0}")]
ProcessNotFound(ContainerProcess),
#[error("unexpected response {0} to shim {1}")]
#[error("unexpected task response {0} to shim {1}")]
UnexpectedResponse(TaskResponse, String),
#[error("unexpected sandbox response {0} to shim {1}")]
UnexpectedSandboxResponse(SandboxResponse, String),
}

View File

@ -5,6 +5,7 @@
//
use crate::{types::ContainerProcess, ContainerManager};
use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;

View File

@ -141,6 +141,32 @@ pub struct ContainerConfig {
pub stderr: Option<String>,
}
#[derive(Debug, Clone, Display)]
pub enum SandboxRequest {
CreateSandbox(Box<SandboxConfig>),
StartSandbox(SandboxID),
Platform(SandboxID),
StopSandbox(StopSandboxRequest),
WaitSandbox(SandboxID),
SandboxStatus(SandboxStatusRequest),
Ping(SandboxID),
ShutdownSandbox(SandboxID),
}
/// Response: sandbox response to shim
/// Request and Response messages need to be paired
#[derive(Debug, Clone, Display)]
pub enum SandboxResponse {
CreateSandbox,
StartSandbox(StartSandboxInfo),
Platform(PlatformInfo),
StopSandbox,
WaitSandbox(SandboxExitInfo),
SandboxStatus(SandboxStatusInfo),
Ping,
ShutdownSandbox,
}
#[derive(Clone, Debug)]
pub struct SandboxConfig {
pub sandbox_id: String,
@ -152,6 +178,58 @@ pub struct SandboxConfig {
pub state: runtime_spec::State,
}
#[derive(Clone, Debug)]
pub struct SandboxID {
pub sandbox_id: String,
}
#[derive(Clone, Debug)]
pub struct StartSandboxInfo {
pub pid: u32,
pub create_time: Option<std::time::SystemTime>,
}
#[derive(Clone, Debug)]
pub struct PlatformInfo {
pub os: String,
pub architecture: String,
}
#[derive(Clone, Debug)]
pub struct StopSandboxRequest {
pub sandbox_id: String,
pub timeout_secs: u32,
}
#[derive(Clone, Debug, Default)]
pub struct SandboxExitInfo {
pub exit_status: u32,
pub exited_at: Option<std::time::SystemTime>,
}
#[derive(Clone, Debug)]
pub struct SandboxStatusRequest {
pub sandbox_id: String,
pub verbose: bool,
}
#[derive(Clone, Debug)]
pub struct SandboxStatusInfo {
pub sandbox_id: String,
pub pid: u32,
pub state: String,
pub created_at: Option<std::time::SystemTime>,
pub exited_at: Option<std::time::SystemTime>,
}
#[derive(Default, Clone, Debug)]
pub struct SandboxStatus {
pub sandbox_id: String,
pub pid: u32,
pub state: String,
pub info: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct PID {
pub pid: u32,

View File

@ -6,16 +6,26 @@
use super::{
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest,
ResizePTYRequest, SandboxConfig, SandboxID, SandboxNetworkEnv, SandboxRequest,
SandboxStatusRequest, ShutdownRequest, StopSandboxRequest, TaskRequest, UpdateRequest,
};
use anyhow::{Context, Result};
use containerd_shim_protos::api;
use kata_types::mount::Mount;
use std::{
convert::{From, TryFrom},
path::PathBuf,
};
use protobuf::Message;
use runtime_spec;
use protocols::api as cri_api_v1;
use anyhow::{anyhow, Context, Result};
use containerd_shim_protos::{api, sandbox_api};
pub const SANDBOX_API_V1: &str = "runtime.v1.PodSandboxConfig";
fn trans_from_shim_mount(from: &api::Mount) -> Mount {
let options = from.options.to_vec();
let mut read_only = false;
@ -37,6 +47,112 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount {
}
}
// There're a lot of information to create a sandbox from CreateSandboxRequest and the internal PodSandboxConfig.
// At present, we only take out part of it to build SandboxConfig.
impl TryFrom<sandbox_api::CreateSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::CreateSandboxRequest) -> Result<Self> {
let type_url = from.options.type_url.clone();
if type_url != SANDBOX_API_V1 {
return Err(anyhow!(format!("unsupported type url: {}", type_url)));
};
let config = cri_api_v1::PodSandboxConfig::parse_from_bytes(&from.options.value)?;
let mut dns: Vec<String> = vec![];
config.dns_config.map(|mut dns_config| {
dns.append(&mut dns_config.servers);
dns.append(&mut dns_config.servers);
dns.append(&mut dns_config.options);
});
Ok(SandboxRequest::CreateSandbox(Box::new(SandboxConfig {
sandbox_id: from.sandbox_id.clone(),
hostname: config.hostname,
dns,
network_env: SandboxNetworkEnv {
netns: Some(from.netns_path),
network_created: false,
},
annotations: config.annotations.clone(),
hooks: None,
state: runtime_spec::State {
version: Default::default(),
id: from.sandbox_id,
status: runtime_spec::ContainerState::Creating,
pid: 0,
bundle: from.bundle_path,
annotations: config.annotations,
},
})))
}
}
impl TryFrom<sandbox_api::StartSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::StartSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::StartSandbox(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::PlatformRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::PlatformRequest) -> Result<Self> {
Ok(SandboxRequest::Platform(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::StopSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::StopSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::StopSandbox(StopSandboxRequest {
sandbox_id: from.sandbox_id,
timeout_secs: from.timeout_secs,
}))
}
}
impl TryFrom<sandbox_api::WaitSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::WaitSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::WaitSandbox(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::SandboxStatusRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::SandboxStatusRequest) -> Result<Self> {
Ok(SandboxRequest::SandboxStatus(SandboxStatusRequest {
sandbox_id: from.sandbox_id,
verbose: from.verbose,
}))
}
}
impl TryFrom<sandbox_api::PingRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::PingRequest) -> Result<Self> {
Ok(SandboxRequest::Ping(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::ShutdownSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::ShutdownSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::ShutdownSandbox(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<api::CreateTaskRequest> for TaskRequest {
type Error = anyhow::Error;
fn try_from(from: api::CreateTaskRequest) -> Result<Self> {

View File

@ -10,12 +10,139 @@ use std::{
};
use anyhow::{anyhow, Result};
use containerd_shim_protos::api;
use containerd_shim_protos::{api, sandbox_api};
use super::utils::option_system_time_into;
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse};
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, SandboxResponse, TaskResponse};
use crate::error::Error;
impl TryFrom<SandboxResponse> for sandbox_api::CreateSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::CreateSandbox => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::StartSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::StartSandbox(resp) => Ok(Self {
pid: resp.pid,
created_at: option_system_time_into(resp.create_time),
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::PlatformResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::Platform(resp) => {
let mut sandbox_resp = Self::new();
sandbox_resp.mut_platform().set_os(resp.os);
sandbox_resp
.mut_platform()
.set_architecture(resp.architecture);
Ok(sandbox_resp)
}
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::StopSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::StopSandbox => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::WaitSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::WaitSandbox(resp) => Ok(Self {
exit_status: resp.exit_status,
exited_at: option_system_time_into(resp.exited_at),
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::SandboxStatusResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::SandboxStatus(resp) => Ok(Self {
sandbox_id: resp.sandbox_id,
pid: resp.pid,
state: resp.state,
created_at: option_system_time_into(resp.created_at),
exited_at: option_system_time_into(resp.exited_at),
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::PingResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::Ping => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::ShutdownSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::ShutdownSandbox => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl From<ProcessExitStatus> for api::WaitResponse {
fn from(from: ProcessExitStatus) -> Self {
Self {
@ -38,6 +165,7 @@ impl From<ProcessStatus> for api::Status {
}
}
}
impl From<ProcessStateInfo> for api::StateResponse {
fn from(from: ProcessStateInfo) -> Self {
Self {

View File

@ -7,9 +7,13 @@
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
types::{ContainerProcess, SandboxConfig, TaskRequest, TaskResponse},
types::{
ContainerProcess, PlatformInfo, SandboxConfig, SandboxRequest, SandboxResponse,
StartSandboxInfo, TaskRequest, TaskResponse,
},
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
};
use hypervisor::Param;
use kata_sys_util::{mount::get_mount_path, spec::load_oci_spec};
use kata_types::{
@ -29,9 +33,11 @@ use runtime_spec as spec;
use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER;
use std::{
collections::HashMap,
ops::Deref,
path::{Path, PathBuf},
str::from_utf8,
sync::Arc,
time::SystemTime,
};
use tokio::fs;
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
@ -374,7 +380,24 @@ impl RuntimeHandlerManager {
}
#[instrument(parent = &*(ROOTSPAN))]
pub async fn handler_message(&self, req: TaskRequest) -> Result<TaskResponse> {
pub async fn handler_sandbox_message(&self, req: SandboxRequest) -> Result<SandboxResponse> {
if let SandboxRequest::CreateSandbox(sandbox_config) = req {
let config = sandbox_config.deref().clone();
self.sandbox_init_runtime_instance(config)
.await
.context("init sandboxed runtime")?;
Ok(SandboxResponse::CreateSandbox)
} else {
self.handler_sandbox_request(req)
.await
.context("handler request")
}
}
#[instrument(parent = &*(ROOTSPAN))]
pub async fn handler_task_message(&self, req: TaskRequest) -> Result<TaskResponse> {
if let TaskRequest::CreateContainer(container_config) = req {
// get oci spec
let bundler_path = format!(
@ -429,14 +452,57 @@ impl RuntimeHandlerManager {
Ok(TaskResponse::CreateContainer(shim_pid))
} else {
self.handler_request(req)
self.handler_task_request(req)
.await
.context("handler TaskRequest")
}
}
pub async fn handler_sandbox_request(&self, req: SandboxRequest) -> Result<SandboxResponse> {
let instance = self
.get_runtime_instance()
.await
.context("get runtime instance")?;
let sandbox = instance.sandbox.clone();
match req {
SandboxRequest::CreateSandbox(req) => Err(anyhow!("Unreachable request {:?}", req)),
SandboxRequest::StartSandbox(_) => {
sandbox
.start()
.await
.context("start sandbox in sandbox handler")?;
Ok(SandboxResponse::StartSandbox(StartSandboxInfo {
pid: std::process::id(),
create_time: Some(SystemTime::now()),
}))
}
SandboxRequest::Platform(_) => Ok(SandboxResponse::Platform(PlatformInfo {
os: std::env::consts::OS.to_string(),
architecture: std::env::consts::ARCH.to_string(),
})),
SandboxRequest::StopSandbox(_) => {
sandbox.stop().await.context("stop sandbox")?;
Ok(SandboxResponse::StopSandbox)
}
SandboxRequest::WaitSandbox(_) => {
unimplemented!()
}
SandboxRequest::SandboxStatus(_) => {
unimplemented!()
}
SandboxRequest::Ping(_) => Ok(SandboxResponse::Ping),
SandboxRequest::ShutdownSandbox(_) => {
sandbox.shutdown().await.context("shutdown sandbox")?;
Ok(SandboxResponse::ShutdownSandbox)
}
}
}
#[instrument(parent = &(*ROOTSPAN))]
pub async fn handler_request(&self, req: TaskRequest) -> Result<TaskResponse> {
pub async fn handler_task_request(&self, req: TaskRequest) -> Result<TaskResponse> {
let instance = self
.get_runtime_instance()
.await

View File

@ -15,7 +15,7 @@ tracing = "0.1.36"
ttrpc = "0.8.4"
common = { path = "../runtimes/common" }
containerd-shim-protos = { version = "0.6.0", features = ["async"] }
containerd-shim-protos = { version = "0.6.0", features = ["async", "sandbox"] }
containerd-shim = { version = "0.6.0", features = ["async"] }
logging = { path = "../../../libs/logging" }
kata-types = { path = "../../../libs/kata-types" }

View File

@ -42,7 +42,7 @@ impl TaskService {
let logger = sl!().new(o!("stream id" => ctx.mh.stream_id));
debug!(logger, "====> task service {:?}", &r);
let resp =
self.handler.handler_message(r).await.map_err(|err| {
self.handler.handler_task_message(r).await.map_err(|err| {
ttrpc::Error::Others(format!("failed to handle message {:?}", err))
})?;
debug!(logger, "<==== task service {:?}", &resp);

View File

@ -29,7 +29,7 @@ async fn real_main() {
stderr: None,
});
manager.handler_message(req).await.ok();
manager.handler_task_message(req).await.ok();
}
fn main() -> Result<(), Box<dyn std::error::Error>> {