runtime-rs: service and runtime framework

1. service: Responsible for processing services, such as task service, image service
2. Responsible for implementing different runtimes, such as Virt-container,
Linux-container, Wasm-container

Fixes: #3785
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Quanwei Zhou 2021-12-03 18:53:48 +08:00 committed by Fupan Li
parent 4296e3069f
commit bdfee005fa
37 changed files with 1713 additions and 83 deletions

View File

@ -142,7 +142,7 @@ impl AgentService {
) -> Result<()> {
let cid = req.container_id.clone();
kata_sys_util::validate::verify_cid(&cid)?;
kata_sys_util::validate::verify_id(&cid)?;
let mut oci_spec = req.OCI.clone();
let use_sandbox_pidns = req.get_sandbox_pidns();

View File

@ -10,11 +10,11 @@ pub enum Error {
InvalidContainerID(String),
}
// A container ID must match this regex:
// A container ID or exec ID must match this regex:
//
// ^[a-zA-Z0-9][a-zA-Z0-9_.-]+$
//
pub fn verify_cid(id: &str) -> Result<(), Error> {
pub fn verify_id(id: &str) -> Result<(), Error> {
let mut chars = id.chars();
let valid = match chars.next() {
@ -253,7 +253,7 @@ mod tests {
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let result = verify_cid(d.id);
let result = verify_id(d.id);
let msg = format!("{}, result: {:?}", msg, result);

View File

@ -13,6 +13,10 @@ use crate::{eother, resolve_path, validate_path};
/// Kata runtime configuration information.
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct Runtime {
/// Runtime name: Plan to support virt-container, linux-container, wasm-container
#[serde(default)]
pub name: String,
/// If enabled, the runtime will log additional debug messages to the system log.
#[serde(default, rename = "enable_debug")]
pub debug: bool,
@ -238,6 +242,7 @@ vfio_mode = "guest_kernel"
fn test_config() {
let content = r#"
[runtime]
name = "virt-container"
enable_debug = true
experimental = ["a", "b"]
internetworking_model = "macvtap"
@ -255,6 +260,7 @@ field_should_be_ignored = true
"#;
let config: TomlConfig = TomlConfig::load(content).unwrap();
config.validate().unwrap();
assert_eq!(&config.runtime.name, "virt-container");
assert!(config.runtime.debug);
assert_eq!(config.runtime.experimental.len(), 2);
assert_eq!(&config.runtime.experimental[0], "a");

View File

@ -17,6 +17,17 @@ mod log_writer;
pub use file_rotate::FileRotator;
pub use log_writer::LogWriter;
#[macro_export]
macro_rules! logger_with_subsystem {
($name: ident, $subsystem: expr) => {
macro_rules! $name {
() => {
slog_scope::logger().new(slog::o!("subsystem" => $subsystem))
};
}
};
}
const LOG_LEVELS: &[(&str, slog::Level)] = &[
("trace", slog::Level::Trace),
("debug", slog::Level::Debug),

View File

@ -14,6 +14,8 @@ use std::collections::HashMap;
mod serialize;
pub use serialize::{to_string, to_writer, Error, Result};
pub const OCI_SPEC_CONFIG_FILE_NAME: &str = "config.json";
#[allow(dead_code)]
fn is_false(b: bool) -> bool {
!b

View File

@ -157,7 +157,7 @@ checksum = "cdae996d9638ba03253ffa1c93345a585974a97abbdeab9176c77922f3efc1e8"
dependencies = [
"libc",
"log",
"nix 0.23.1",
"nix",
"regex",
]
@ -174,6 +174,28 @@ dependencies = [
"winapi",
]
[[package]]
name = "common"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"containerd-shim-protos",
"kata-sys-util",
"kata-types",
"lazy_static",
"nix",
"oci",
"protobuf",
"serde_json",
"slog",
"slog-scope",
"strum",
"thiserror",
"tokio",
"ttrpc",
]
[[package]]
name = "common-path"
version = "1.0.0"
@ -476,6 +498,12 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -559,7 +587,7 @@ dependencies = [
"kata-types",
"lazy_static",
"libc",
"nix 0.23.1",
"nix",
"oci",
"once_cell",
"serde_json",
@ -622,6 +650,16 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "linux_container"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"common",
"tokio",
]
[[package]]
name = "lock_api"
version = "0.4.6"
@ -711,19 +749,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nix"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0eaf8df8bab402257e0a5c17a254e4cc1f72a93588a1ddfb5d356c801aa7cb"
dependencies = [
"bitflags",
"cc",
"cfg-if 0.1.10",
"libc",
"void",
]
[[package]]
name = "nix"
version = "0.23.1"
@ -930,7 +955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603"
dependencies = [
"bytes 1.1.0",
"heck",
"heck 0.3.3",
"itertools",
"log",
"multimap",
@ -1078,6 +1103,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "runtimes"
version = "0.1.0"
dependencies = [
"anyhow",
"common",
"kata-types",
"lazy_static",
"linux_container",
"logging",
"oci",
"slog",
"slog-scope",
"tokio",
"virt_container",
"wasm_container",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
@ -1170,6 +1213,22 @@ dependencies = [
"syn",
]
[[package]]
name = "service"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"common",
"containerd-shim-protos",
"logging",
"runtimes",
"slog",
"slog-scope",
"tokio",
"ttrpc",
]
[[package]]
name = "sha2"
version = "0.9.3"
@ -1196,10 +1255,12 @@ dependencies = [
"libc",
"log",
"logging",
"nix 0.16.1",
"nix",
"oci",
"protobuf",
"rand",
"serial_test",
"service",
"sha2",
"slog",
"slog-async",
@ -1213,6 +1274,15 @@ dependencies = [
"vergen",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.5"
@ -1287,6 +1357,28 @@ dependencies = [
"winapi",
]
[[package]]
name = "strum"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef"
dependencies = [
"heck 0.4.0",
"proc-macro2",
"quote",
"rustversion",
"syn",
]
[[package]]
name = "subprocess"
version = "0.2.8"
@ -1411,7 +1503,9 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"once_cell",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi",
@ -1461,7 +1555,7 @@ dependencies = [
"futures 0.3.21",
"libc",
"log",
"nix 0.23.1",
"nix",
"protobuf",
"protobuf-codegen-pure",
"thiserror",
@ -1580,10 +1674,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
name = "virt_container"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"common",
"kata-types",
"tokio",
]
[[package]]
name = "vsock"
@ -1592,7 +1691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e32675ee2b3ce5df274c0ab52d19b28789632406277ca26bffee79a8e27dc133"
dependencies = [
"libc",
"nix 0.23.1",
"nix",
]
[[package]]
@ -1607,6 +1706,16 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm_container"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"common",
"tokio",
]
[[package]]
name = "which"
version = "4.2.5"

View File

@ -120,12 +120,12 @@ fn parse(address: &str, port: u32) -> Result<SockType> {
let url = Url::parse(address).context("parse url")?;
match url.scheme() {
VSOCK_SCHEME => {
let cid = url
let vsock_cid = url
.host_str()
.unwrap_or_default()
.parse::<u32>()
.context("parse cid")?;
Ok(SockType::Vsock(Vsock::new(cid, port)))
.context("parse vsock cid")?;
Ok(SockType::Vsock(Vsock::new(vsock_cid, port)))
}
HYBRID_VSOCK_SCHEME => {
let path: Vec<&str> = url.path().split(':').collect();

View File

@ -14,13 +14,13 @@ unsafe impl Sync for Vsock {}
#[derive(Debug, PartialEq)]
pub struct Vsock {
cid: u32,
vsock_cid: u32,
port: u32,
}
impl Vsock {
pub fn new(cid: u32, port: u32) -> Self {
Self { cid, port }
pub fn new(vsock_cid: u32, port: u32) -> Self {
Self { vsock_cid, port }
}
}

View File

@ -0,0 +1,28 @@
[package]
name = "runtimes"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dependencies]
anyhow = "^1.0"
lazy_static = "1.4.0"
slog = "2.5.2"
slog-scope = "4.4.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
common = { path = "./common" }
kata-types = { path = "../../../libs/kata-types" }
logging = { path = "../../../libs/logging"}
oci = { path = "../../../libs/oci" }
# runtime handler
linux_container = { path = "./linux_container", optional = true }
virt_container = { path = "./virt_container", optional = true }
wasm_container = { path = "./wasm_container", optional = true }
[features]
default = ["virt"]
linux = ["linux_container"]
virt = ["virt_container"]
wasm = ["wasm_container"]

View File

@ -0,0 +1,26 @@
[package]
name = "common"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
lazy_static = "1.4.0"
nix = "0.23.1"
protobuf = "2.23.0"
serde_json = "1.0.39"
slog = "2.5.2"
slog-scope = "4.4.0"
strum = { version = "0.24.0", features = ["derive"] }
thiserror = "^1.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread", "process", "fs"] }
ttrpc = { version = "0.6.0" }
kata-sys-util = { path = "../../../../libs/kata-sys-util" }
kata-types = { path = "../../../../libs/kata-types" }
oci = { path = "../../../../libs/oci" }

View File

@ -0,0 +1,40 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use async_trait::async_trait;
use crate::types::{
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
ProcessExitStatus, ProcessStateInfo, ResizePTYRequest, ShutdownRequest, StatsInfo,
UpdateRequest, PID,
};
#[async_trait]
pub trait ContainerManager: Send + Sync {
// container lifecycle
async fn create_container(&self, config: ContainerConfig) -> Result<PID>;
async fn pause_container(&self, container_id: &ContainerID) -> Result<()>;
async fn resume_container(&self, container_id: &ContainerID) -> Result<()>;
async fn stats_container(&self, container_id: &ContainerID) -> Result<StatsInfo>;
async fn update_container(&self, req: UpdateRequest) -> Result<()>;
async fn connect_container(&self, container_id: &ContainerID) -> Result<PID>;
// process lifecycle
async fn close_process_io(&self, process_id: &ContainerProcess) -> Result<()>;
async fn delete_process(&self, process_id: &ContainerProcess) -> Result<ProcessStateInfo>;
async fn exec_process(&self, req: ExecProcessRequest) -> Result<()>;
async fn kill_process(&self, req: &KillRequest) -> Result<()>;
async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()>;
async fn start_process(&self, process_id: &ContainerProcess) -> Result<PID>;
async fn state_process(&self, process_id: &ContainerProcess) -> Result<ProcessStateInfo>;
async fn wait_process(&self, process_id: &ContainerProcess) -> Result<ProcessExitStatus>;
// utility
async fn pid(&self) -> Result<PID>;
async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool;
async fn is_sandbox_container(&self, process_id: &ContainerProcess) -> bool;
}

View File

@ -0,0 +1,17 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use crate::types::{ContainerProcess, Response};
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("failed to find container {0}")]
ContainerNotFound(String),
#[error("failed to find process {0}")]
ProcessNotFound(ContainerProcess),
#[error("unexpected response {0} to shim {1}")]
UnexpectedResponse(Response, String),
}

View File

@ -0,0 +1,15 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod container_manager;
pub use container_manager::ContainerManager;
pub mod error;
pub mod message;
mod runtime_handler;
pub use runtime_handler::{RuntimeHandler, RuntimeInstance};
mod sandbox;
pub use sandbox::Sandbox;
pub mod types;

View File

@ -0,0 +1,44 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use tokio::sync::mpsc::{channel, Receiver, Sender};
/// message receiver buffer size
const MESSAGE_RECEIVER_BUFFER_SIZE: usize = 1;
#[derive(Debug)]
pub enum Action {
Start,
Stop,
Shutdown,
}
#[derive(Debug)]
pub struct Message {
pub action: Action,
pub resp_sender: Option<Sender<Result<()>>>,
}
impl Message {
pub fn new(action: Action) -> Self {
Message {
action,
resp_sender: None,
}
}
pub fn new_with_receiver(action: Action) -> (Receiver<Result<()>>, Self) {
let (resp_sender, receiver) = channel(MESSAGE_RECEIVER_BUFFER_SIZE);
(
receiver,
Message {
action,
resp_sender: Some(resp_sender),
},
)
}
}

View File

@ -0,0 +1,38 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;
use crate::{message::Message, ContainerManager, Sandbox};
#[derive(Clone)]
pub struct RuntimeInstance {
pub sandbox: Arc<dyn Sandbox>,
pub container_manager: Arc<dyn ContainerManager>,
}
#[async_trait]
pub trait RuntimeHandler: Send + Sync {
fn init() -> Result<()>
where
Self: Sized;
fn name() -> String
where
Self: Sized;
fn new_handler() -> Arc<dyn RuntimeHandler>
where
Self: Sized;
async fn new_instance(&self, sid: &str, msg_sender: Sender<Message>)
-> Result<RuntimeInstance>;
fn cleanup(&self, id: &str) -> Result<()>;
}

View File

@ -0,0 +1,16 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use async_trait::async_trait;
#[async_trait]
pub trait Sandbox: Send + Sync {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn cleanup(&self, container_id: &str) -> Result<()>;
async fn shutdown(&self) -> Result<()>;
}

View File

@ -0,0 +1,219 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod trans_from_shim;
mod trans_into_shim;
use std::fmt;
use anyhow::{Context, Result};
use kata_sys_util::validate;
use kata_types::mount::Mount;
use strum::Display;
/// Request: request from shim
/// Request and Response messages need to be paired
#[derive(Debug, Clone, Display)]
pub enum Request {
CreateContainer(ContainerConfig),
CloseProcessIO(ContainerProcess),
DeleteProcess(ContainerProcess),
ExecProcess(ExecProcessRequest),
KillProcess(KillRequest),
WaitProcess(ContainerProcess),
StartProcess(ContainerProcess),
StateProcess(ContainerProcess),
ShutdownContainer(ShutdownRequest),
PauseContainer(ContainerID),
ResumeContainer(ContainerID),
ResizeProcessPTY(ResizePTYRequest),
StatsContainer(ContainerID),
UpdateContainer(UpdateRequest),
Pid,
ConnectContainer(ContainerID),
}
/// Response: response to shim
/// Request and Response messages need to be paired
#[derive(Debug, Clone, Display)]
pub enum Response {
CreateContainer(PID),
CloseProcessIO,
DeleteProcess(ProcessStateInfo),
ExecProcess,
KillProcess,
WaitProcess(ProcessExitStatus),
StartProcess(PID),
StateProcess(ProcessStateInfo),
ShutdownContainer,
PauseContainer,
ResumeContainer,
ResizeProcessPTY,
StatsContainer(StatsInfo),
UpdateContainer,
Pid(PID),
ConnectContainer(PID),
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ProcessType {
Container,
Exec,
}
#[derive(Clone, Debug)]
pub struct ContainerID {
pub container_id: String,
}
impl ContainerID {
pub fn new(container_id: &str) -> Result<Self> {
validate::verify_id(container_id).context("verify container id")?;
Ok(Self {
container_id: container_id.to_string(),
})
}
}
#[derive(Clone, Debug)]
pub struct ContainerProcess {
pub container_id: ContainerID,
pub exec_id: String,
pub process_type: ProcessType,
}
impl fmt::Display for ContainerProcess {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", &self)
}
}
impl ContainerProcess {
pub fn new(container_id: &str, exec_id: &str) -> Result<Self> {
let (exec_id, process_type) = if exec_id.is_empty() || container_id == exec_id {
("".to_string(), ProcessType::Container)
} else {
validate::verify_id(exec_id).context("verify exec id")?;
(exec_id.to_string(), ProcessType::Exec)
};
Ok(Self {
container_id: ContainerID::new(container_id)?,
exec_id,
process_type,
})
}
}
#[derive(Debug, Clone)]
pub struct ContainerConfig {
pub container_id: String,
pub bundle: String,
pub rootfs_mounts: Vec<Mount>,
pub terminal: bool,
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PID {
pub pid: u32,
}
impl PID {
pub fn new(pid: u32) -> Self {
Self { pid }
}
}
#[derive(Debug, Clone)]
pub struct KillRequest {
pub process_id: ContainerProcess,
pub signal: u32,
pub all: bool,
}
#[derive(Debug, Clone)]
pub struct ShutdownRequest {
pub container_id: String,
pub is_now: bool,
}
#[derive(Debug, Clone)]
pub struct ResizePTYRequest {
pub process_id: ContainerProcess,
pub width: u32,
pub height: u32,
}
#[derive(Debug, Clone)]
pub struct ExecProcessRequest {
pub process_id: ContainerProcess,
pub terminal: bool,
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub spec_type_url: String,
pub spec_value: Vec<u8>,
}
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum ProcessStatus {
Unknown = 0,
Created = 1,
Running = 2,
Stopped = 3,
Paused = 4,
Pausing = 5,
}
#[derive(Debug, Clone)]
pub struct ProcessStateInfo {
pub container_id: String,
pub exec_id: String,
pub pid: PID,
pub bundle: String,
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub terminal: bool,
pub status: ProcessStatus,
pub exit_status: i32,
pub exited_at: Option<std::time::SystemTime>,
}
#[derive(Debug, Clone, Default)]
pub struct ProcessExitStatus {
pub exit_code: i32,
pub exit_time: Option<std::time::SystemTime>,
}
impl ProcessExitStatus {
pub fn new() -> Self {
Self::default()
}
pub fn update_exit_code(&mut self, exit_code: i32) {
self.exit_code = exit_code;
self.exit_time = Some(std::time::SystemTime::now());
}
}
#[derive(Debug, Clone)]
pub struct StatsInfoValue {
pub type_url: String,
pub value: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct StatsInfo {
pub value: Option<StatsInfoValue>,
}
#[derive(Debug, Clone)]
pub struct UpdateRequest {
pub container_id: String,
pub value: Vec<u8>,
}

View File

@ -0,0 +1,198 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
convert::{From, TryFrom},
path::PathBuf,
};
use anyhow::{Context, Result};
use containerd_shim_protos::api;
use kata_types::mount::Mount;
use super::{
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request,
ResizePTYRequest, ShutdownRequest, UpdateRequest,
};
fn trans_from_shim_mount(from: api::Mount) -> Mount {
let options = from.options.to_vec();
let mut read_only = false;
for o in &options {
if o == "ro" {
read_only = true;
break;
}
}
Mount {
source: from.source.clone(),
destination: PathBuf::from(&from.target),
fs_type: from.field_type,
options,
device_id: None,
host_shared_fs_path: None,
read_only,
}
}
impl TryFrom<api::CreateTaskRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::CreateTaskRequest) -> Result<Self> {
Ok(Request::CreateContainer(ContainerConfig {
container_id: from.id.clone(),
bundle: from.bundle.clone(),
rootfs_mounts: from
.rootfs
.to_vec()
.into_iter()
.map(trans_from_shim_mount)
.collect(),
terminal: from.terminal,
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()),
stderr: (!from.stderr.is_empty()).then(|| from.stderr.clone()),
}))
}
}
impl TryFrom<api::CloseIORequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::CloseIORequest) -> Result<Self> {
Ok(Request::CloseProcessIO(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
))
}
}
impl TryFrom<api::DeleteRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::DeleteRequest) -> Result<Self> {
Ok(Request::DeleteProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
))
}
}
impl TryFrom<api::ExecProcessRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::ExecProcessRequest) -> Result<Self> {
let spec = from.get_spec();
Ok(Request::ExecProcess(ExecProcessRequest {
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
terminal: from.terminal,
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()),
stderr: (!from.stderr.is_empty()).then(|| from.stderr.clone()),
spec_type_url: spec.get_type_url().to_string(),
spec_value: spec.get_value().to_vec(),
}))
}
}
impl TryFrom<api::KillRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::KillRequest) -> Result<Self> {
Ok(Request::KillProcess(KillRequest {
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
signal: from.signal,
all: from.all,
}))
}
}
impl TryFrom<api::WaitRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::WaitRequest) -> Result<Self> {
Ok(Request::WaitProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
))
}
}
impl TryFrom<api::StartRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::StartRequest) -> Result<Self> {
Ok(Request::StartProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
))
}
}
impl TryFrom<api::StateRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::StateRequest) -> Result<Self> {
Ok(Request::StateProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
))
}
}
impl TryFrom<api::ShutdownRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::ShutdownRequest) -> Result<Self> {
Ok(Request::ShutdownContainer(ShutdownRequest {
container_id: from.id.to_string(),
is_now: from.now,
}))
}
}
impl TryFrom<api::ResizePtyRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::ResizePtyRequest) -> Result<Self> {
Ok(Request::ResizeProcessPTY(ResizePTYRequest {
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
width: from.width,
height: from.height,
}))
}
}
impl TryFrom<api::PauseRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::PauseRequest) -> Result<Self> {
Ok(Request::PauseContainer(ContainerID::new(&from.id)?))
}
}
impl TryFrom<api::ResumeRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::ResumeRequest) -> Result<Self> {
Ok(Request::ResumeContainer(ContainerID::new(&from.id)?))
}
}
impl TryFrom<api::StatsRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::StatsRequest) -> Result<Self> {
Ok(Request::StatsContainer(ContainerID::new(&from.id)?))
}
}
impl TryFrom<api::UpdateTaskRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::UpdateTaskRequest) -> Result<Self> {
Ok(Request::UpdateContainer(UpdateRequest {
container_id: from.id.to_string(),
value: from.get_resources().get_value().to_vec(),
}))
}
}
impl TryFrom<api::PidsRequest> for Request {
type Error = anyhow::Error;
fn try_from(_from: api::PidsRequest) -> Result<Self> {
Ok(Request::Pid)
}
}
impl TryFrom<api::ConnectRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::ConnectRequest) -> Result<Self> {
Ok(Request::ConnectContainer(ContainerID::new(&from.id)?))
}
}

View File

@ -0,0 +1,242 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
any::type_name,
convert::{Into, TryFrom, TryInto},
time,
};
use anyhow::{anyhow, Result};
use containerd_shim_protos::api;
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response};
use crate::error::Error;
fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::Timestamp {
let mut proto_time = ::protobuf::well_known_types::Timestamp::new();
proto_time.set_seconds(
time.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.try_into()
.unwrap_or_default(),
);
proto_time
}
fn option_system_time_into(
time: Option<time::SystemTime>,
) -> ::protobuf::SingularPtrField<::protobuf::well_known_types::Timestamp> {
match time {
Some(v) => ::protobuf::SingularPtrField::some(system_time_into(v)),
None => ::protobuf::SingularPtrField::none(),
}
}
impl From<ProcessExitStatus> for api::WaitResponse {
fn from(from: ProcessExitStatus) -> Self {
Self {
exit_status: from.exit_code as u32,
exited_at: option_system_time_into(from.exit_time),
..Default::default()
}
}
}
impl From<ProcessStatus> for api::Status {
fn from(from: ProcessStatus) -> Self {
match from {
ProcessStatus::Unknown => api::Status::UNKNOWN,
ProcessStatus::Created => api::Status::CREATED,
ProcessStatus::Running => api::Status::RUNNING,
ProcessStatus::Stopped => api::Status::STOPPED,
ProcessStatus::Paused => api::Status::PAUSED,
ProcessStatus::Pausing => api::Status::PAUSING,
}
}
}
impl From<ProcessStateInfo> for api::StateResponse {
fn from(from: ProcessStateInfo) -> Self {
Self {
id: from.container_id.clone(),
bundle: from.bundle.clone(),
pid: from.pid.pid,
status: from.status.into(),
stdin: from.stdin.unwrap_or_default(),
stdout: from.stdout.unwrap_or_default(),
stderr: from.stderr.unwrap_or_default(),
terminal: from.terminal,
exit_status: from.exit_status as u32,
exited_at: option_system_time_into(from.exited_at),
exec_id: from.exec_id,
..Default::default()
}
}
}
impl From<ProcessStateInfo> for api::DeleteResponse {
fn from(from: ProcessStateInfo) -> Self {
Self {
pid: from.pid.pid,
exit_status: from.exit_status as u32,
exited_at: option_system_time_into(from.exited_at),
..Default::default()
}
}
}
impl TryFrom<Response> for api::CreateTaskResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::CreateContainer(resp) => Ok(Self {
pid: resp.pid,
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::DeleteResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::DeleteProcess(resp) => Ok(resp.into()),
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::WaitResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::WaitProcess(resp) => Ok(resp.into()),
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::StartResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::StartProcess(resp) => Ok(api::StartResponse {
pid: resp.pid,
..Default::default()
}),
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::StateResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::StateProcess(resp) => Ok(resp.into()),
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::StatsResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
let mut any = ::protobuf::well_known_types::Any::new();
let mut response = api::StatsResponse::new();
match from {
Response::StatsContainer(resp) => {
if let Some(value) = resp.value {
any.set_type_url(value.type_url);
any.set_value(value.value);
response.set_stats(any);
}
Ok(response)
}
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::PidsResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::Pid(resp) => {
let mut processes: Vec<api::ProcessInfo> = vec![];
let mut p_info = api::ProcessInfo::new();
let mut res = api::PidsResponse::new();
p_info.set_pid(resp.pid);
processes.push(p_info);
let v = protobuf::RepeatedField::<api::ProcessInfo>::from_vec(processes);
res.set_processes(v);
Ok(res)
}
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::ConnectResponse {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::ConnectContainer(resp) => {
let mut res = api::ConnectResponse::new();
res.set_shim_pid(resp.pid);
Ok(res)
}
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}
impl TryFrom<Response> for api::Empty {
type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> {
match from {
Response::CloseProcessIO => Ok(api::Empty::new()),
Response::ExecProcess => Ok(api::Empty::new()),
Response::KillProcess => Ok(api::Empty::new()),
Response::ShutdownContainer => Ok(api::Empty::new()),
Response::PauseContainer => Ok(api::Empty::new()),
Response::ResumeContainer => Ok(api::Empty::new()),
Response::ResizeProcessPTY => Ok(api::Empty::new()),
Response::UpdateContainer => Ok(api::Empty::new()),
_ => Err(anyhow!(Error::UnexpectedResponse(
from,
type_name::<Self>().to_string()
))),
}
}
}

View File

@ -0,0 +1,12 @@
[package]
name = "linux_container"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
tokio = { version = "1.8.0" }
common = { path = "../common" }

View File

@ -0,0 +1,42 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use tokio::sync::mpsc::Sender;
unsafe impl Send for LinuxContainer {}
unsafe impl Sync for LinuxContainer {}
pub struct LinuxContainer {}
#[async_trait]
impl RuntimeHandler for LinuxContainer {
fn init() -> Result<()> {
Ok(())
}
fn name() -> String {
"linux_container".to_string()
}
fn new_handler() -> Arc<dyn RuntimeHandler> {
Arc::new(LinuxContainer {})
}
async fn new_instance(
&self,
_sid: &str,
_msg_sender: Sender<Message>,
) -> Result<RuntimeInstance> {
todo!()
}
fn cleanup(&self, _id: &str) -> Result<()> {
todo!()
}
}

View File

@ -0,0 +1,13 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate slog;
logging::logger_with_subsystem!(sl, "runtimes");
mod manager;
pub use manager::RuntimeHandlerManager;

View File

@ -0,0 +1,251 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
types::{Request, Response},
RuntimeHandler, RuntimeInstance,
};
use kata_types::{annotations::Annotation, config::TomlConfig};
use tokio::sync::{mpsc::Sender, RwLock};
#[cfg(feature = "linux")]
use linux_container::LinuxContainer;
#[cfg(feature = "virt")]
use virt_container::VirtContainer;
#[cfg(feature = "wasm")]
use wasm_container::WasmContainer;
struct RuntimeHandlerManagerInner {
id: String,
msg_sender: Sender<Message>,
runtime_instance: Option<RuntimeInstance>,
}
impl RuntimeHandlerManagerInner {
fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
Ok(Self {
id: id.to_string(),
msg_sender,
runtime_instance: None,
})
}
async fn init_runtime_handler(&mut self, runtime_name: &str) -> Result<()> {
info!(sl!(), "new runtime handler {}", runtime_name);
let runtime_handler = match runtime_name {
#[cfg(feature = "linux")]
name if name == LinuxContainer::name() => {
LinuxContainer::init().context("init linux container")?;
LinuxContainer::new_handler()
}
#[cfg(feature = "wasm")]
name if name == WasmContainer::name() => {
WasmContainer::init().context("init wasm container")?;
WasmContainer::new_handler()
}
#[cfg(feature = "virt")]
name if name == VirtContainer::name() => {
VirtContainer::init().context("init virt container")?;
VirtContainer::new_handler()
}
_ => return Err(anyhow!("Unsupported runtime: {}", runtime_name)),
};
let runtime_instance = runtime_handler
.new_instance(&self.id, self.msg_sender.clone())
.await
.context("new runtime instance")?;
// start sandbox
runtime_instance
.sandbox
.start()
.await
.context("start sandbox")?;
self.runtime_instance = Some(runtime_instance);
Ok(())
}
async fn try_init(&mut self, spec: &oci::Spec) -> Result<()> {
// return if runtime instance has init
if self.runtime_instance.is_some() {
return Ok(());
}
let config = load_config(spec).context("load config")?;
self.init_runtime_handler(&config.runtime.name)
.await
.context("init runtime handler")?;
Ok(())
}
fn get_runtime_instance(&self) -> Option<RuntimeInstance> {
self.runtime_instance.clone()
}
}
unsafe impl Send for RuntimeHandlerManager {}
unsafe impl Sync for RuntimeHandlerManager {}
pub struct RuntimeHandlerManager {
inner: Arc<RwLock<RuntimeHandlerManagerInner>>,
}
impl RuntimeHandlerManager {
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
Ok(Self {
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
id, msg_sender,
)?)),
})
}
pub fn cleanup(_id: &str) -> Result<()> {
// TODO: load runtime from persist and cleanup
Ok(())
}
pub async fn handler_message(&self, req: Request) -> Result<Response> {
if let Request::CreateContainer(req) = req {
// get oci spec
let bundler_path = format!("{}/{}", req.bundle, oci::OCI_SPEC_CONFIG_FILE_NAME);
let spec = oci::Spec::load(&bundler_path).context("load spec")?;
let mut inner = self.inner.write().await;
inner
.try_init(&spec)
.await
.context("try init runtime handler")?;
let instance = inner
.get_runtime_instance()
.ok_or_else(|| anyhow!("runtime not ready"))?;
let shim_pid = instance
.container_manager
.create_container(req)
.await
.context("create container")?;
Ok(Response::CreateContainer(shim_pid))
} else {
self.handler_request(req).await.context("handler request")
}
}
pub async fn handler_request(&self, req: Request) -> Result<Response> {
let inner = self.inner.read().await;
let instance = inner
.get_runtime_instance()
.ok_or_else(|| anyhow!("runtime not ready"))?;
let sandbox = instance.sandbox;
let cm = instance.container_manager;
match req {
Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)),
Request::CloseProcessIO(process_id) => {
cm.close_process_io(&process_id).await.context("close io")?;
Ok(Response::CloseProcessIO)
}
Request::DeleteProcess(process_id) => {
let resp = cm.delete_process(&process_id).await.context("do delete")?;
Ok(Response::DeleteProcess(resp))
}
Request::ExecProcess(req) => {
cm.exec_process(req).await.context("exec")?;
Ok(Response::ExecProcess)
}
Request::KillProcess(req) => {
cm.kill_process(&req).await.context("kill process")?;
Ok(Response::KillProcess)
}
Request::ShutdownContainer(req) => {
if cm.need_shutdown_sandbox(&req).await {
sandbox.shutdown().await.context("do shutdown")?;
}
Ok(Response::ShutdownContainer)
}
Request::WaitProcess(process_id) => {
let exit_status = cm.wait_process(&process_id).await.context("wait process")?;
if cm.is_sandbox_container(&process_id).await {
sandbox.stop().await.context("stop sandbox")?;
}
Ok(Response::WaitProcess(exit_status))
}
Request::StartProcess(process_id) => {
let shim_pid = cm
.start_process(&process_id)
.await
.context("start process")?;
Ok(Response::StartProcess(shim_pid))
}
Request::StateProcess(process_id) => {
let state = cm
.state_process(&process_id)
.await
.context("state process")?;
Ok(Response::StateProcess(state))
}
Request::PauseContainer(container_id) => {
cm.pause_container(&container_id)
.await
.context("pause container")?;
Ok(Response::PauseContainer)
}
Request::ResumeContainer(container_id) => {
cm.resume_container(&container_id)
.await
.context("resume container")?;
Ok(Response::ResumeContainer)
}
Request::ResizeProcessPTY(req) => {
cm.resize_process_pty(&req).await.context("resize pty")?;
Ok(Response::ResizeProcessPTY)
}
Request::StatsContainer(container_id) => {
let stats = cm
.stats_container(&container_id)
.await
.context("stats container")?;
Ok(Response::StatsContainer(stats))
}
Request::UpdateContainer(req) => {
cm.update_container(req).await.context("update container")?;
Ok(Response::UpdateContainer)
}
Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)),
Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer(
cm.connect_container(&container_id)
.await
.context("connect")?,
)),
}
}
}
/// Config override ordering(high to low):
/// 1. podsandbox annotation
/// 2. shimv2 create task option
/// TODO: https://github.com/kata-containers/kata-containers/issues/3961
/// 3. environment
fn load_config(spec: &oci::Spec) -> Result<TomlConfig> {
const KATA_CONF_FILE: &str = "KATA_CONF_FILE";
let annotation = Annotation::new(spec.annotations.clone());
let config_path = if let Some(path) = annotation.get_sandbox_config_path() {
path
} else if let Ok(path) = std::env::var(KATA_CONF_FILE) {
path
} else {
String::from("")
};
info!(sl!(), "get config path {:?}", &config_path);
let (toml_config, _) = TomlConfig::load_from_file(&config_path).context("load toml config")?;
Ok(toml_config)
}

View File

@ -0,0 +1,13 @@
[package]
name = "virt_container"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
tokio = { version = "1.8.0" }
common = { path = "../common" }
kata-types = { path = "../../../../libs/kata-types" }

View File

@ -0,0 +1,47 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig};
use tokio::sync::mpsc::Sender;
unsafe impl Send for VirtContainer {}
unsafe impl Sync for VirtContainer {}
pub struct VirtContainer {}
#[async_trait]
impl RuntimeHandler for VirtContainer {
fn init() -> Result<()> {
// register
let dragonball_config = Arc::new(DragonballConfig::new());
register_hypervisor_plugin("dragonball", dragonball_config);
Ok(())
}
fn name() -> String {
"virt_container".to_string()
}
fn new_handler() -> Arc<dyn RuntimeHandler> {
Arc::new(VirtContainer {})
}
async fn new_instance(
&self,
_sid: &str,
_msg_sender: Sender<Message>,
) -> Result<RuntimeInstance> {
todo!()
}
fn cleanup(&self, _id: &str) -> Result<()> {
// TODO
Ok(())
}
}

View File

@ -0,0 +1,12 @@
[package]
name = "wasm_container"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
tokio = { version = "1.8.0" }
common = { path = "../common" }

View File

@ -0,0 +1,42 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use tokio::sync::mpsc::Sender;
unsafe impl Send for WasmContainer {}
unsafe impl Sync for WasmContainer {}
pub struct WasmContainer {}
#[async_trait]
impl RuntimeHandler for WasmContainer {
fn init() -> Result<()> {
Ok(())
}
fn name() -> String {
"wasm_container".to_string()
}
fn new_handler() -> Arc<dyn RuntimeHandler> {
Arc::new(WasmContainer {})
}
async fn new_instance(
&self,
_sid: &str,
_msg_sender: Sender<Message>,
) -> Result<RuntimeInstance> {
todo!()
}
fn cleanup(&self, _id: &str) -> Result<()> {
todo!()
}
}

View File

@ -0,0 +1,18 @@
[package]
name = "service"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
slog = "2.5.2"
slog-scope = "4.4.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
ttrpc = { version = "0.6.0" }
common = { path = "../runtimes/common" }
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
logging = { path = "../../../libs/logging"}
runtimes = { path = "../runtimes" }

View File

@ -0,0 +1,14 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate slog;
logging::logger_with_subsystem!(sl, "service");
mod manager;
pub use manager::ServiceManager;
mod task_service;

View File

@ -0,0 +1,107 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
os::unix::io::{FromRawFd, RawFd},
sync::Arc,
};
use anyhow::{Context, Result};
use common::message::{Action, Message};
use containerd_shim_protos::shim_async;
use runtimes::RuntimeHandlerManager;
use tokio::sync::mpsc::{channel, Receiver};
use ttrpc::asynchronous::Server;
use crate::task_service::TaskService;
/// message buffer size
const MESSAGE_BUFFER_SIZE: usize = 8;
pub struct ServiceManager {
receiver: Option<Receiver<Message>>,
handler: Arc<RuntimeHandlerManager>,
task_server: Option<Server>,
}
impl ServiceManager {
pub async fn new(id: &str, task_server_fd: RawFd) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = Arc::new(
RuntimeHandlerManager::new(id, sender)
.await
.context("new runtime handler")?,
);
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
task_server = task_server.set_domain_unix();
Ok(Self {
receiver: Some(receiver),
handler,
task_server: Some(task_server),
})
}
pub async fn run(&mut self) -> Result<()> {
info!(sl!(), "begin to run service");
self.start().await.context("start")?;
let mut rx = self.receiver.take();
if let Some(rx) = rx.as_mut() {
while let Some(r) = rx.recv().await {
info!(sl!(), "receive action {:?}", &r.action);
let result = match r.action {
Action::Start => self.start().await.context("start listen"),
Action::Stop => self.stop_listen().await.context("stop listen"),
Action::Shutdown => {
self.stop_listen().await.context("stop listen")?;
break;
}
};
if let Some(ref sender) = r.resp_sender {
sender.send(result).await.context("send response")?;
}
}
}
info!(sl!(), "end to run service");
Ok(())
}
pub fn cleanup(id: &str) -> Result<()> {
RuntimeHandlerManager::cleanup(id)
}
async fn start(&mut self) -> Result<()> {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>);
let task_server = self.task_server.take();
let task_server = match task_server {
Some(t) => {
let mut t = t.register_service(shim_async::create_task(task_service));
t.start().await.context("task server start")?;
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(())
}
async fn stop_listen(&mut self) -> Result<()> {
let task_server = self.task_server.take();
let task_server = match task_server {
Some(mut t) => {
t.stop_listen().await;
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(())
}
}

View File

@ -0,0 +1,82 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use async_trait::async_trait;
use common::types::{Request, Response};
use containerd_shim_protos::{api, shim_async};
use ttrpc::{self, r#async::TtrpcContext};
use runtimes::RuntimeHandlerManager;
pub(crate) struct TaskService {
handler: Arc<RuntimeHandlerManager>,
}
impl TaskService {
pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self {
Self { handler }
}
}
async fn handler_message<TtrpcReq, TtrpcResp>(
s: &RuntimeHandlerManager,
ctx: &TtrpcContext,
req: TtrpcReq,
) -> ttrpc::Result<TtrpcResp>
where
Request: TryFrom<TtrpcReq>,
<Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
TtrpcResp: TryFrom<Response>,
<TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug,
{
let r = req
.try_into()
.map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?;
let logger = sl!().new(o!("steam id" => ctx.mh.stream_id));
debug!(logger, "====> task service {:?}", &r);
let resp = s
.handler_message(r)
.await
.map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?;
debug!(logger, "<==== task service {:?}", &resp);
Ok(resp
.try_into()
.map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))?)
}
macro_rules! impl_service {
($($name: tt | $req: ty | $resp: ty),*) => {
#[async_trait]
impl shim_async::Task for TaskService {
$(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> {
handler_message(&self.handler, ctx, req).await
})*
}
};
}
impl_service!(
state | api::StateRequest | api::StateResponse,
create | api::CreateTaskRequest | api::CreateTaskResponse,
start | api::StartRequest | api::StartResponse,
delete | api::DeleteRequest | api::DeleteResponse,
pids | api::PidsRequest | api::PidsResponse,
pause | api::PauseRequest | api::Empty,
resume | api::ResumeRequest | api::Empty,
kill | api::KillRequest | api::Empty,
exec | api::ExecProcessRequest | api::Empty,
resize_pty | api::ResizePtyRequest | api::Empty,
update | api::UpdateTaskRequest | api::Empty,
wait | api::WaitRequest | api::WaitResponse,
stats | api::StatsRequest | api::StatsResponse,
connect | api::ConnectRequest | api::ConnectResponse,
shutdown | api::ShutdownRequest | api::Empty
);

View File

@ -19,7 +19,7 @@ containerd-shim-protos = { version = "0.2.0", features = ["async"]}
go-flag = "0.1.0"
libc = "0.2.108"
log = "0.4.14"
nix = "0.16.0"
nix = "0.23.1"
protobuf = "2.23.0"
sha2 = "=0.9.3"
slog = {version = "2.7.0", features = ["std", "release_max_level_trace", "max_level_trace"]}
@ -34,11 +34,13 @@ kata-types = { path = "../../../libs/kata-types"}
kata-sys-util = { path = "../../../libs/kata-sys-util"}
logging = { path = "../../../libs/logging"}
oci = { path = "../../../libs/oci" }
service = { path = "../service" }
[build-dependencies]
vergen = { version = "6", default-features = false, features = ["build", "git", "rustc"] }
[dev-dependencies]
tempfile = "3.2.0"
rand = "0.8.4"
serial_test = "0.5.1"
tests_utils = { path = "../../tests/utils"}

View File

@ -51,8 +51,8 @@ impl Args {
))));
}
validate::verify_cid(&self.id).context("verify cid")?;
validate::verify_cid(&self.namespace).context("verify namespace")?;
validate::verify_id(&self.id).context("verify container id")?;
validate::verify_id(&self.namespace).context("verify namespace")?;
// Ensure `address` is a valid path.
let path = PathBuf::from(self.address.clone())

View File

@ -7,11 +7,7 @@
#[macro_use]
extern crate slog;
macro_rules! sl {
() => {
slog_scope::logger().new(slog::o!("subsystem" => "shim"))
};
}
logging::logger_with_subsystem!(sl, "shim");
mod args;
pub use args::Args;

View File

@ -31,7 +31,7 @@ impl ShimExecutor {
}
pub(crate) fn load_oci_spec(&self, path: &Path) -> Result<oci::Spec> {
let spec_file = path.join("config.json");
let spec_file = path.join(oci::OCI_SPEC_CONFIG_FILE_NAME);
oci::Spec::load(spec_file.to_str().unwrap_or_default()).context("load spec")
}

View File

@ -5,7 +5,7 @@
//
use anyhow::{Context, Result};
use containerd_shim_protos::shim::shim::DeleteResponse;
use containerd_shim_protos::api;
use protobuf::Message;
use crate::{shim::ShimExecutor, Error};
@ -19,8 +19,8 @@ impl ShimExecutor {
Ok(())
}
fn do_cleanup(&self) -> Result<DeleteResponse> {
let mut rsp = DeleteResponse::new();
fn do_cleanup(&self) -> Result<api::DeleteResponse> {
let mut rsp = api::DeleteResponse::new();
rsp.set_exit_status(128 + libc::SIGKILL as u32);
let mut exited_time = protobuf::well_known_types::Timestamp::new();
let seconds = std::time::SystemTime::now()
@ -30,42 +30,7 @@ impl ShimExecutor {
exited_time.set_seconds(seconds);
rsp.set_exited_at(exited_time);
// TODO: implement cleanup
service::ServiceManager::cleanup(&self.args.id).context("cleanup")?;
Ok(rsp)
}
}
#[cfg(test)]
mod tests {
use serial_test::serial;
use tests_utils::gen_id;
use super::*;
use crate::Args;
#[test]
#[serial]
fn test_shim_delete() {
let dir = tempfile::tempdir().unwrap();
let bundle_path = dir.path();
std::env::set_current_dir(bundle_path).unwrap();
let id = gen_id(16);
let namespace = gen_id(16);
let args = Args {
id,
namespace,
address: "containerd_socket".into(),
publish_binary: "containerd".into(),
socket: "socket".into(),
bundle: bundle_path.to_str().unwrap().into(),
debug: false,
};
let executor = ShimExecutor::new(args);
let resp = executor.do_cleanup().unwrap();
assert_eq!(resp.exit_status, 128 + libc::SIGKILL as u32);
assert!(resp.exited_at.as_ref().unwrap().seconds > 0);
}
}

View File

@ -38,8 +38,11 @@ impl ShimExecutor {
info!(sl!(), "start to run");
self.args.validate(false).context("validata")?;
let _server_fd = get_server_fd().context("get server fd")?;
// TODO: implement run
let server_fd = get_server_fd().context("get server fd")?;
let mut service_manager = service::ServiceManager::new(&self.args.id, server_fd)
.await
.context("new runtime server")?;
service_manager.run().await.context("run")?;
Ok(())
}