sandbox: Add the sandbox api support

For Kata-Containers, we add SandboxService for these new calls alongside the existing
TaskService, including processing requests and replies, and properly calling
VirtSandbox's interfaces. By splitting the start logic of the sandbox, virt_container
is compatible with calls from the SandboxService and TaskService. In addition, we modify
the processing of resource configuration to solve the problem that SandboxService does not
have a spec file when creating a pod.

Sandbox api can be supported from containerd 1.7. But there's a difference from container 2.0.
To enbale it from 2.0, you can support the sandbox api for a specific runtime by adding:
 sandboxer = "shim", take kata runtime as an example:

[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.kata]
          runtime_type = "io.containerd.kata.v2"
          sandboxer = "shim"
          privileged_without_host_devices = true
          pod_annotations = ["io.katacontainers.*"]

For container version 1.7, you can enable it by:

1: add env ENABLE_CRI_SANDBOXES=true
2: add sandbox_mode = "shim" to runtime config.

Acknowledgement

This work was based on @wllenyj's POC code:
(f5b62a2d7c)

Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
Signed-off-by: wllenyj <wllenyj@linux.alibaba.com>
This commit is contained in:
Fupan Li 2024-09-18 14:19:20 +08:00
parent e3365ca348
commit 8976fa8c5d
20 changed files with 591 additions and 71 deletions

View File

@ -37,8 +37,10 @@ fn get_uds_with_sid(short_id: &str, path: &str) -> Result<String> {
return Ok(format!("unix://{}", p.display()));
}
let _ = fs::create_dir_all(kata_run_path.join(short_id))
.context(format!("failed to create directory {:?}", kata_run_path.join(short_id)));
let _ = fs::create_dir_all(kata_run_path.join(short_id)).context(format!(
"failed to create directory {:?}",
kata_run_path.join(short_id)
));
let target_ids: Vec<String> = fs::read_dir(&kata_run_path)?
.filter_map(|e| {
@ -71,8 +73,11 @@ fn get_uds_with_sid(short_id: &str, path: &str) -> Result<String> {
}
// return sandbox's storage path
pub fn sb_storage_path() -> String {
String::from(KATA_PATH)
pub fn sb_storage_path() -> Result<&'static str> {
//make sure the path existed
std::fs::create_dir_all(KATA_PATH).context(format!("failed to create dir: {}", KATA_PATH))?;
Ok(KATA_PATH)
}
// returns the address of the unix domain socket(UDS) for communication with shim
@ -85,7 +90,7 @@ pub fn mgmt_socket_addr(sid: &str) -> Result<String> {
));
}
get_uds_with_sid(sid, &sb_storage_path())
get_uds_with_sid(sid, sb_storage_path()?)
}
#[cfg(test)]

View File

@ -29,6 +29,7 @@ use tokio::sync::RwLock;
use crate::ResourceUpdateOp;
const OS_ERROR_NO_SUCH_PROCESS: i32 = 3;
const SANDBOXED_CGROUP_PATH: &str = "kata_sandboxed_pod";
pub struct CgroupArgs {
pub sid: String,
@ -44,9 +45,8 @@ pub struct CgroupConfig {
impl CgroupConfig {
fn new(sid: &str, toml_config: &TomlConfig) -> Result<Self> {
let overhead_path = utils::gen_overhead_path(sid);
let spec = load_oci_spec()?;
let path = spec
.linux()
let path = if let Ok(spec) = load_oci_spec() {
spec.linux()
.clone()
.and_then(|linux| linux.cgroups_path().clone())
.map(|path| {
@ -56,7 +56,10 @@ impl CgroupConfig {
.trim_start_matches('/')
.to_string()
})
.unwrap_or_default();
.unwrap_or_default()
} else {
format!("{}/{}", SANDBOXED_CGROUP_PATH, sid)
};
Ok(Self {
path,

View File

@ -67,9 +67,9 @@ impl ResourceManagerInner {
.await
.context("failed to create device manager")?;
let cgroups_resource = CgroupsResource::new(sid, &toml_config)?;
let cpu_resource = CpuResource::new(toml_config.clone())?;
let mem_resource = MemResource::new(init_size_manager)?;
let cgroups_resource = CgroupsResource::new(sid, &toml_config).context("load cgroup")?;
let cpu_resource = CpuResource::new(toml_config.clone()).context("load cpu resource")?;
let mem_resource = MemResource::new(init_size_manager).context("load memory resources")?;
Ok(Self {
sid: sid.to_string(),
toml_config,

View File

@ -28,3 +28,4 @@ kata-types = { path = "../../../../libs/kata-types" }
runtime-spec = { path = "../../../../libs/runtime-spec" }
oci-spec = { version = "0.6.8", features = ["runtime"] }
resource = { path = "../../resource" }
protocols = { path = "../../../../libs/protocols"}

View File

@ -4,7 +4,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::types::{ContainerProcess, TaskResponse};
use crate::types::{ContainerProcess, SandboxResponse, TaskResponse};
#[derive(thiserror::Error, Debug)]
pub enum Error {
@ -12,6 +12,8 @@ pub enum Error {
ContainerNotFound(String),
#[error("failed to find process {0}")]
ProcessNotFound(ContainerProcess),
#[error("unexpected response {0} to shim {1}")]
#[error("unexpected task response {0} to shim {1}")]
UnexpectedResponse(TaskResponse, String),
#[error("unexpected sandbox response {0} to shim {1}")]
UnexpectedSandboxResponse(SandboxResponse, String),
}

View File

@ -11,5 +11,5 @@ pub mod message;
mod runtime_handler;
pub use runtime_handler::{RuntimeHandler, RuntimeInstance};
mod sandbox;
pub use sandbox::{Sandbox, SandboxNetworkEnv};
pub use sandbox::{Sandbox, SandboxNetworkEnv, SandboxStatus};
pub mod types;

View File

@ -4,7 +4,10 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::{types::ContainerProcess, ContainerManager};
use crate::{
types::{ContainerProcess, SandboxExitInfo},
ContainerManager,
};
use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
@ -15,6 +18,14 @@ pub struct SandboxNetworkEnv {
pub network_created: bool,
}
#[derive(Default, Clone, Debug)]
pub struct SandboxStatus {
pub sandbox_id: String,
pub pid: u32,
pub state: String,
pub info: std::collections::HashMap<String, String>,
}
impl std::fmt::Debug for SandboxNetworkEnv {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SandboxNetworkEnv")
@ -27,6 +38,8 @@ impl std::fmt::Debug for SandboxNetworkEnv {
#[async_trait]
pub trait Sandbox: Send + Sync {
async fn start(&self) -> Result<()>;
async fn status(&self) -> Result<SandboxStatus>;
async fn wait(&self) -> Result<SandboxExitInfo>;
async fn stop(&self) -> Result<()>;
async fn cleanup(&self) -> Result<()>;
async fn shutdown(&self) -> Result<()>;

View File

@ -141,6 +141,32 @@ pub struct ContainerConfig {
pub stderr: Option<String>,
}
#[derive(Debug, Clone, Display)]
pub enum SandboxRequest {
CreateSandbox(Box<SandboxConfig>),
StartSandbox(SandboxID),
Platform(SandboxID),
StopSandbox(StopSandboxRequest),
WaitSandbox(SandboxID),
SandboxStatus(SandboxStatusRequest),
Ping(SandboxID),
ShutdownSandbox(SandboxID),
}
/// Response: sandbox response to shim
/// Request and Response messages need to be paired
#[derive(Debug, Clone, Display)]
pub enum SandboxResponse {
CreateSandbox,
StartSandbox(StartSandboxInfo),
Platform(PlatformInfo),
StopSandbox,
WaitSandbox(SandboxExitInfo),
SandboxStatus(SandboxStatusInfo),
Ping,
ShutdownSandbox,
}
#[derive(Clone, Debug)]
pub struct SandboxConfig {
pub sandbox_id: String,
@ -152,6 +178,50 @@ pub struct SandboxConfig {
pub state: runtime_spec::State,
}
#[derive(Clone, Debug)]
pub struct SandboxID {
pub sandbox_id: String,
}
#[derive(Clone, Debug)]
pub struct StartSandboxInfo {
pub pid: u32,
pub create_time: Option<std::time::SystemTime>,
}
#[derive(Clone, Debug)]
pub struct PlatformInfo {
pub os: String,
pub architecture: String,
}
#[derive(Clone, Debug)]
pub struct StopSandboxRequest {
pub sandbox_id: String,
pub timeout_secs: u32,
}
#[derive(Clone, Debug, Default)]
pub struct SandboxExitInfo {
pub exit_status: u32,
pub exited_at: Option<std::time::SystemTime>,
}
#[derive(Clone, Debug)]
pub struct SandboxStatusRequest {
pub sandbox_id: String,
pub verbose: bool,
}
#[derive(Clone, Debug)]
pub struct SandboxStatusInfo {
pub sandbox_id: String,
pub pid: u32,
pub state: String,
pub created_at: Option<std::time::SystemTime>,
pub exited_at: Option<std::time::SystemTime>,
}
#[derive(Debug, Clone)]
pub struct PID {
pub pid: u32,

View File

@ -6,16 +6,25 @@
use super::{
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest,
ResizePTYRequest, SandboxConfig, SandboxID, SandboxNetworkEnv, SandboxRequest,
SandboxStatusRequest, ShutdownRequest, StopSandboxRequest, TaskRequest, UpdateRequest,
};
use anyhow::{Context, Result};
use containerd_shim_protos::api;
use kata_types::mount::Mount;
use std::{
convert::{From, TryFrom},
path::PathBuf,
};
use protobuf::Message;
use runtime_spec;
use protocols::api as cri_api_v1;
use anyhow::{anyhow, Context, Result};
use containerd_shim_protos::{api, sandbox_api};
pub const SANDBOX_API_V1: &str = "runtime.v1.PodSandboxConfig";
fn trans_from_shim_mount(from: &api::Mount) -> Mount {
let options = from.options.to_vec();
let mut read_only = false;
@ -37,6 +46,112 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount {
}
}
// There're a lot of information to create a sandbox from CreateSandboxRequest and the internal PodSandboxConfig.
// At present, we only take out part of it to build SandboxConfig.
impl TryFrom<sandbox_api::CreateSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::CreateSandboxRequest) -> Result<Self> {
let type_url = from.options.type_url.clone();
if type_url != SANDBOX_API_V1 {
return Err(anyhow!(format!("unsupported type url: {}", type_url)));
};
let config = cri_api_v1::PodSandboxConfig::parse_from_bytes(&from.options.value)?;
let mut dns: Vec<String> = vec![];
config.dns_config.map(|mut dns_config| {
dns.append(&mut dns_config.servers);
dns.append(&mut dns_config.servers);
dns.append(&mut dns_config.options);
});
Ok(SandboxRequest::CreateSandbox(Box::new(SandboxConfig {
sandbox_id: from.sandbox_id.clone(),
hostname: config.hostname,
dns,
network_env: SandboxNetworkEnv {
netns: Some(from.netns_path),
network_created: false,
},
annotations: config.annotations.clone(),
hooks: None,
state: runtime_spec::State {
version: Default::default(),
id: from.sandbox_id,
status: runtime_spec::ContainerState::Creating,
pid: 0,
bundle: from.bundle_path,
annotations: config.annotations,
},
})))
}
}
impl TryFrom<sandbox_api::StartSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::StartSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::StartSandbox(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::PlatformRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::PlatformRequest) -> Result<Self> {
Ok(SandboxRequest::Platform(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::StopSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::StopSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::StopSandbox(StopSandboxRequest {
sandbox_id: from.sandbox_id,
timeout_secs: from.timeout_secs,
}))
}
}
impl TryFrom<sandbox_api::WaitSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::WaitSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::WaitSandbox(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::SandboxStatusRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::SandboxStatusRequest) -> Result<Self> {
Ok(SandboxRequest::SandboxStatus(SandboxStatusRequest {
sandbox_id: from.sandbox_id,
verbose: from.verbose,
}))
}
}
impl TryFrom<sandbox_api::PingRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::PingRequest) -> Result<Self> {
Ok(SandboxRequest::Ping(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<sandbox_api::ShutdownSandboxRequest> for SandboxRequest {
type Error = anyhow::Error;
fn try_from(from: sandbox_api::ShutdownSandboxRequest) -> Result<Self> {
Ok(SandboxRequest::ShutdownSandbox(SandboxID {
sandbox_id: from.sandbox_id,
}))
}
}
impl TryFrom<api::CreateTaskRequest> for TaskRequest {
type Error = anyhow::Error;
fn try_from(from: api::CreateTaskRequest) -> Result<Self> {

View File

@ -10,10 +10,10 @@ use std::{
};
use anyhow::{anyhow, Result};
use containerd_shim_protos::api;
use containerd_shim_protos::{api, sandbox_api};
use super::utils::option_system_time_into;
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse};
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, SandboxResponse, TaskResponse};
use crate::error::Error;
impl From<ProcessExitStatus> for api::WaitResponse {
@ -26,6 +26,133 @@ impl From<ProcessExitStatus> for api::WaitResponse {
}
}
impl TryFrom<SandboxResponse> for sandbox_api::CreateSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::CreateSandbox => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::StartSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::StartSandbox(resp) => Ok(Self {
pid: resp.pid,
created_at: option_system_time_into(resp.create_time),
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::PlatformResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::Platform(resp) => {
let mut sandbox_resp = Self::new();
sandbox_resp.mut_platform().set_os(resp.os);
sandbox_resp
.mut_platform()
.set_architecture(resp.architecture);
Ok(sandbox_resp)
}
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::StopSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::StopSandbox => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::WaitSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::WaitSandbox(resp) => Ok(Self {
exit_status: resp.exit_status,
exited_at: option_system_time_into(resp.exited_at),
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::SandboxStatusResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::SandboxStatus(resp) => Ok(Self {
sandbox_id: resp.sandbox_id,
pid: resp.pid,
state: resp.state,
created_at: option_system_time_into(resp.created_at),
exited_at: option_system_time_into(resp.exited_at),
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::PingResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::Ping => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<SandboxResponse> for sandbox_api::ShutdownSandboxResponse {
type Error = anyhow::Error;
fn try_from(from: SandboxResponse) -> Result<Self> {
match from {
SandboxResponse::ShutdownSandbox => Ok(Self::new()),
_ => Err(anyhow!(Error::UnexpectedSandboxResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl From<ProcessStatus> for api::Status {
fn from(from: ProcessStatus) -> Self {
match from {

View File

@ -7,7 +7,10 @@
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
types::{ContainerProcess, SandboxConfig, TaskRequest, TaskResponse},
types::{
ContainerProcess, PlatformInfo, SandboxConfig, SandboxRequest, SandboxResponse,
SandboxStatusInfo, StartSandboxInfo, TaskRequest, TaskResponse,
},
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
};
use hypervisor::Param;
@ -29,9 +32,11 @@ use runtime_spec as spec;
use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER;
use std::{
collections::HashMap,
ops::Deref,
path::{Path, PathBuf},
str::from_utf8,
sync::Arc,
time::SystemTime,
};
use tokio::fs;
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
@ -138,14 +143,6 @@ impl RuntimeHandlerManagerInner {
Ok(())
}
#[instrument]
async fn start_runtime_handler(&self) -> Result<()> {
if let Some(instance) = self.runtime_instance.as_ref() {
instance.sandbox.start().await.context("start sandbox")?;
}
Ok(())
}
#[instrument]
async fn try_init(
&mut self,
@ -200,10 +197,6 @@ impl RuntimeHandlerManagerInner {
.await
.context("init runtime handler")?;
self.start_runtime_handler()
.await
.context("start runtime handler")?;
// the sandbox creation can reach here only once and the sandbox is created
// so we can safely create the shim management socket right now
// the unwrap here is safe because the runtime handler is correctly created
@ -318,6 +311,14 @@ impl RuntimeHandlerManager {
state: &spec::State,
options: &Option<Vec<u8>>,
) -> Result<()> {
let mut inner: tokio::sync::RwLockWriteGuard<'_, RuntimeHandlerManagerInner> =
self.inner.write().await;
// return if runtime instance has init
if inner.runtime_instance.is_some() {
return Ok(());
}
let mut dns: Vec<String> = vec![];
let spec_mounts = spec.mounts().clone().unwrap_or_default();
@ -357,9 +358,6 @@ impl RuntimeHandlerManager {
network_created,
};
let mut inner: tokio::sync::RwLockWriteGuard<'_, RuntimeHandlerManagerInner> =
self.inner.write().await;
let sandbox_config = SandboxConfig {
sandbox_id: inner.id.clone(),
dns,
@ -374,18 +372,30 @@ impl RuntimeHandlerManager {
}
#[instrument]
async fn init_runtime_instance(
&self,
sandbox_config: SandboxConfig,
spec: Option<&oci::Spec>,
options: &Option<Vec<u8>>,
) -> Result<()> {
async fn sandbox_init_runtime_instance(&self, sandbox_config: SandboxConfig) -> Result<()> {
let mut inner = self.inner.write().await;
inner.try_init(sandbox_config, spec, options).await
inner.try_init(sandbox_config, None, &None).await
}
#[instrument(parent = &*(ROOTSPAN))]
pub async fn handler_message(&self, req: TaskRequest) -> Result<TaskResponse> {
pub async fn handler_sandbox_message(&self, req: SandboxRequest) -> Result<SandboxResponse> {
if let SandboxRequest::CreateSandbox(sandbox_config) = req {
let config = sandbox_config.deref().clone();
self.sandbox_init_runtime_instance(config)
.await
.context("init sandboxed runtime")?;
Ok(SandboxResponse::CreateSandbox)
} else {
self.handler_sandbox_request(req)
.await
.context("handler request")
}
}
#[instrument(parent = &*(ROOTSPAN))]
pub async fn handler_task_message(&self, req: TaskRequest) -> Result<TaskResponse> {
if let TaskRequest::CreateContainer(container_config) = req {
// get oci spec
let bundler_path = format!(
@ -406,11 +416,17 @@ impl RuntimeHandlerManager {
self.task_init_runtime_instance(&spec, &state, &container_config.options)
.await
.context("try init runtime instance")?;
let instance = self
let instance: Arc<RuntimeInstance> = self
.get_runtime_instance()
.await
.context("get runtime instance")?;
instance
.sandbox
.start()
.await
.context("start sandbox in task handler")?;
let container_id = container_config.container_id.clone();
let shim_pid = instance
.container_manager
@ -434,14 +450,67 @@ impl RuntimeHandlerManager {
Ok(TaskResponse::CreateContainer(shim_pid))
} else {
self.handler_request(req)
self.handler_task_request(req)
.await
.context("handler TaskRequest")
}
}
pub async fn handler_sandbox_request(&self, req: SandboxRequest) -> Result<SandboxResponse> {
let instance = self
.get_runtime_instance()
.await
.context("get runtime instance")?;
let sandbox = instance.sandbox.clone();
match req {
SandboxRequest::CreateSandbox(req) => Err(anyhow!("Unreachable request {:?}", req)),
SandboxRequest::StartSandbox(_) => {
sandbox
.start()
.await
.context("start sandbox in sandbox handler")?;
Ok(SandboxResponse::StartSandbox(StartSandboxInfo {
pid: std::process::id(),
create_time: Some(SystemTime::now()),
}))
}
SandboxRequest::Platform(_) => Ok(SandboxResponse::Platform(PlatformInfo {
os: std::env::consts::OS.to_string(),
architecture: std::env::consts::ARCH.to_string(),
})),
SandboxRequest::StopSandbox(_) => {
sandbox.stop().await.context("stop sandbox")?;
Ok(SandboxResponse::StopSandbox)
}
SandboxRequest::WaitSandbox(_) => {
let exit_info = sandbox.wait().await.context("wait sandbox")?;
Ok(SandboxResponse::WaitSandbox(exit_info))
}
SandboxRequest::SandboxStatus(_) => {
let status = sandbox.status().await?;
Ok(SandboxResponse::SandboxStatus(SandboxStatusInfo {
sandbox_id: status.sandbox_id,
pid: status.pid,
state: status.state,
created_at: None,
exited_at: None,
}))
}
SandboxRequest::Ping(_) => Ok(SandboxResponse::Ping),
SandboxRequest::ShutdownSandbox(_) => {
sandbox.shutdown().await.context("shutdown sandbox")?;
Ok(SandboxResponse::ShutdownSandbox)
}
}
}
#[instrument(parent = &(*ROOTSPAN))]
pub async fn handler_request(&self, req: TaskRequest) -> Result<TaskResponse> {
pub async fn handler_task_request(&self, req: TaskRequest) -> Result<TaskResponse> {
let instance = self
.get_runtime_instance()
.await

View File

@ -37,6 +37,7 @@ runtime-spec = { path = "../../../../libs/runtime-spec" }
oci-spec = { version = "0.6.8", features = ["runtime"] }
persist = { path = "../../persist"}
resource = { path = "../../resource" }
strum = { version = "0.24.0", features = ["derive"] }
[features]
default = ["cloud-hypervisor"]

View File

@ -14,7 +14,10 @@ use async_trait::async_trait;
use common::message::{Action, Message};
use common::types::utils::option_system_time_into;
use common::types::ContainerProcess;
use common::{types::SandboxConfig, ContainerManager, Sandbox, SandboxNetworkEnv};
use common::{
types::{SandboxConfig, SandboxExitInfo},
ContainerManager, Sandbox, SandboxNetworkEnv, SandboxStatus,
};
use containerd_shim_protos::events::task::{TaskExit, TaskOOM};
use hypervisor::VsockConfig;
#[cfg(not(target_arch = "s390x"))]
@ -38,6 +41,7 @@ use resource::network::{dan_config_path, DanNetworkConfig, NetworkConfig, Networ
use resource::{ResourceConfig, ResourceManager};
use runtime_spec as spec;
use std::sync::Arc;
use strum::Display;
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
use tracing::instrument;
@ -51,7 +55,7 @@ pub struct SandboxRestoreArgs {
pub sender: Sender<Message>,
}
#[derive(Clone, Copy, PartialEq, Debug)]
#[derive(Clone, Copy, PartialEq, Debug, Display)]
pub enum SandboxState {
Init,
Running,
@ -485,6 +489,28 @@ impl Sandbox for VirtSandbox {
Ok(())
}
async fn status(&self) -> Result<SandboxStatus> {
info!(sl!(), "get sandbox status");
let inner = self.inner.read().await;
let state = inner.state.to_string();
Ok(SandboxStatus {
sandbox_id: self.sid.clone(),
pid: std::process::id(),
state,
..Default::default()
})
}
async fn wait(&self) -> Result<SandboxExitInfo> {
info!(sl!(), "wait sandbox");
let exit_code = self.hypervisor.wait_vm().await.context("wait vm")?;
Ok(SandboxExitInfo {
exit_status: exit_code as u32,
exited_at: Some(std::time::SystemTime::now()),
})
}
async fn stop(&self) -> Result<()> {
let mut sandbox_inner = self.inner.write().await;

View File

@ -15,7 +15,7 @@ tracing = "0.1.36"
ttrpc = "0.8"
common = { path = "../runtimes/common" }
containerd-shim-protos = { version = "0.6.0", features = ["async"] }
containerd-shim-protos = { version = "0.6.0", features = ["async", "sandbox"] }
containerd-shim = { version = "0.6.0", features = ["async"] }
logging = { path = "../../../libs/logging" }
kata-types = { path = "../../../libs/kata-types" }

View File

@ -11,6 +11,7 @@ logging::logger_with_subsystem!(sl, "service");
mod event;
mod manager;
mod sandbox_service;
mod task_service;
pub use manager::ServiceManager;

View File

@ -17,7 +17,9 @@ use tokio::sync::mpsc::{channel, Receiver};
use ttrpc::asynchronous::Server;
use crate::event::{new_event_publisher, Forwarder};
use crate::sandbox_service::SandboxService;
use crate::task_service::TaskService;
use containerd_shim_protos::sandbox_async;
/// message buffer size
const MESSAGE_BUFFER_SIZE: usize = 8;
@ -136,25 +138,29 @@ impl ServiceManager {
}
fn registry_service(&mut self) -> Result<()> {
if let Some(t) = self.server.take() {
if let Some(s) = self.server.take() {
let sandbox_service = Arc::new(Box::new(SandboxService::new(self.handler.clone()))
as Box<dyn sandbox_async::Sandbox + Send + Sync>);
let s = s.register_service(sandbox_async::create_sandbox(sandbox_service));
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>);
let t = t.register_service(shim_async::create_task(task_service));
self.server = Some(t);
let s = s.register_service(shim_async::create_task(task_service));
self.server = Some(s);
}
Ok(())
}
async fn start_service(&mut self) -> Result<()> {
if let Some(t) = self.server.as_mut() {
t.start().await.context("task server start")?;
if let Some(s) = self.server.as_mut() {
s.start().await.context("task server start")?;
}
Ok(())
}
async fn stop_service(&mut self) -> Result<()> {
if let Some(t) = self.server.as_mut() {
t.stop_listen().await;
if let Some(s) = self.server.as_mut() {
s.stop_listen().await;
}
Ok(())
}

View File

@ -0,0 +1,76 @@
// Copyright (c) 2019-2024 Alibaba Cloud
// Copyright (c) 2019-2024 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use async_trait::async_trait;
use common::types::{SandboxRequest, SandboxResponse};
use containerd_shim_protos::{sandbox_api, sandbox_async};
use runtimes::RuntimeHandlerManager;
use ttrpc::{self, r#async::TtrpcContext};
pub(crate) struct SandboxService {
handler: Arc<RuntimeHandlerManager>,
}
impl SandboxService {
pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self {
Self { handler }
}
async fn handler_message<TtrpcReq, TtrpcResp>(
&self,
ctx: &TtrpcContext,
req: TtrpcReq,
) -> ttrpc::Result<TtrpcResp>
where
SandboxRequest: TryFrom<TtrpcReq>,
<SandboxRequest as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
TtrpcResp: TryFrom<SandboxResponse>,
<TtrpcResp as TryFrom<SandboxResponse>>::Error: std::fmt::Debug,
{
let r = req.try_into().map_err(|err| {
ttrpc::Error::Others(format!("failed to translate from shim {:?}", err))
})?;
let logger = sl!().new(o!("stream id" => ctx.mh.stream_id));
debug!(logger, "====> sandbox service {:?}", &r);
let resp = self
.handler
.handler_sandbox_message(r)
.await
.map_err(|err| {
ttrpc::Error::Others(format!("failed to handle sandbox message {:?}", err))
})?;
debug!(logger, "<==== sandbox service {:?}", &resp);
resp.try_into()
.map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))
}
}
macro_rules! impl_service {
($($name: tt | $req: ty | $resp: ty),*) => {
#[async_trait]
impl sandbox_async::Sandbox for SandboxService {
$(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> {
self.handler_message(ctx, req).await
})*
}
};
}
impl_service!(
create_sandbox | sandbox_api::CreateSandboxRequest | sandbox_api::CreateSandboxResponse,
start_sandbox | sandbox_api::StartSandboxRequest | sandbox_api::StartSandboxResponse,
platform | sandbox_api::PlatformRequest | sandbox_api::PlatformResponse,
stop_sandbox | sandbox_api::StopSandboxRequest | sandbox_api::StopSandboxResponse,
wait_sandbox | sandbox_api::WaitSandboxRequest | sandbox_api::WaitSandboxResponse,
sandbox_status | sandbox_api::SandboxStatusRequest | sandbox_api::SandboxStatusResponse,
ping_sandbox | sandbox_api::PingRequest | sandbox_api::PingResponse,
shutdown_sandbox | sandbox_api::ShutdownSandboxRequest | sandbox_api::ShutdownSandboxResponse
);

View File

@ -42,7 +42,7 @@ impl TaskService {
let logger = sl!().new(o!("stream id" => ctx.mh.stream_id));
debug!(logger, "====> task service {:?}", &r);
let resp =
self.handler.handler_message(r).await.map_err(|err| {
self.handler.handler_task_message(r).await.map_err(|err| {
ttrpc::Error::Others(format!("failed to handle message {:?}", err))
})?;
debug!(logger, "<==== task service {:?}", &resp);

View File

@ -29,7 +29,7 @@ async fn real_main() {
stderr: None,
});
manager.handler_message(req).await.ok();
manager.handler_task_message(req).await.ok();
}
fn main() -> Result<(), Box<dyn std::error::Error>> {

View File

@ -34,8 +34,13 @@ impl ShimExecutor {
fn do_start(&mut self) -> Result<PathBuf> {
let bundle_path = get_bundle_path().context("get bundle path")?;
let spec = self.load_oci_spec(&bundle_path)?;
let (container_type, id) = k8s::container_type_with_id(&spec);
let mut container_type = ContainerType::PodSandbox;
let mut id = None;
if let Ok(spec) = self.load_oci_spec(&bundle_path) {
(container_type, id) = k8s::container_type_with_id(&spec);
}
match container_type {
ContainerType::PodSandbox | ContainerType::SingleContainer => {