mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-28 19:54:35 +00:00
runtime-rs: service and runtime framework
1. service: Responsible for processing services, such as task service, image service 2. Responsible for implementing different runtimes, such as Virt-container, Linux-container, Wasm-container Fixes: #3785 Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
parent
4296e3069f
commit
bdfee005fa
@ -142,7 +142,7 @@ impl AgentService {
|
||||
) -> Result<()> {
|
||||
let cid = req.container_id.clone();
|
||||
|
||||
kata_sys_util::validate::verify_cid(&cid)?;
|
||||
kata_sys_util::validate::verify_id(&cid)?;
|
||||
|
||||
let mut oci_spec = req.OCI.clone();
|
||||
let use_sandbox_pidns = req.get_sandbox_pidns();
|
||||
|
@ -10,11 +10,11 @@ pub enum Error {
|
||||
InvalidContainerID(String),
|
||||
}
|
||||
|
||||
// A container ID must match this regex:
|
||||
// A container ID or exec ID must match this regex:
|
||||
//
|
||||
// ^[a-zA-Z0-9][a-zA-Z0-9_.-]+$
|
||||
//
|
||||
pub fn verify_cid(id: &str) -> Result<(), Error> {
|
||||
pub fn verify_id(id: &str) -> Result<(), Error> {
|
||||
let mut chars = id.chars();
|
||||
|
||||
let valid = match chars.next() {
|
||||
@ -253,7 +253,7 @@ mod tests {
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
let msg = format!("test[{}]: {:?}", i, d);
|
||||
|
||||
let result = verify_cid(d.id);
|
||||
let result = verify_id(d.id);
|
||||
|
||||
let msg = format!("{}, result: {:?}", msg, result);
|
||||
|
||||
|
@ -13,6 +13,10 @@ use crate::{eother, resolve_path, validate_path};
|
||||
/// Kata runtime configuration information.
|
||||
#[derive(Debug, Default, Deserialize, Serialize)]
|
||||
pub struct Runtime {
|
||||
/// Runtime name: Plan to support virt-container, linux-container, wasm-container
|
||||
#[serde(default)]
|
||||
pub name: String,
|
||||
|
||||
/// If enabled, the runtime will log additional debug messages to the system log.
|
||||
#[serde(default, rename = "enable_debug")]
|
||||
pub debug: bool,
|
||||
@ -238,6 +242,7 @@ vfio_mode = "guest_kernel"
|
||||
fn test_config() {
|
||||
let content = r#"
|
||||
[runtime]
|
||||
name = "virt-container"
|
||||
enable_debug = true
|
||||
experimental = ["a", "b"]
|
||||
internetworking_model = "macvtap"
|
||||
@ -255,6 +260,7 @@ field_should_be_ignored = true
|
||||
"#;
|
||||
let config: TomlConfig = TomlConfig::load(content).unwrap();
|
||||
config.validate().unwrap();
|
||||
assert_eq!(&config.runtime.name, "virt-container");
|
||||
assert!(config.runtime.debug);
|
||||
assert_eq!(config.runtime.experimental.len(), 2);
|
||||
assert_eq!(&config.runtime.experimental[0], "a");
|
||||
|
@ -17,6 +17,17 @@ mod log_writer;
|
||||
pub use file_rotate::FileRotator;
|
||||
pub use log_writer::LogWriter;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! logger_with_subsystem {
|
||||
($name: ident, $subsystem: expr) => {
|
||||
macro_rules! $name {
|
||||
() => {
|
||||
slog_scope::logger().new(slog::o!("subsystem" => $subsystem))
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const LOG_LEVELS: &[(&str, slog::Level)] = &[
|
||||
("trace", slog::Level::Trace),
|
||||
("debug", slog::Level::Debug),
|
||||
|
@ -14,6 +14,8 @@ use std::collections::HashMap;
|
||||
mod serialize;
|
||||
pub use serialize::{to_string, to_writer, Error, Result};
|
||||
|
||||
pub const OCI_SPEC_CONFIG_FILE_NAME: &str = "config.json";
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn is_false(b: bool) -> bool {
|
||||
!b
|
||||
|
155
src/runtime-rs/Cargo.lock
generated
155
src/runtime-rs/Cargo.lock
generated
@ -157,7 +157,7 @@ checksum = "cdae996d9638ba03253ffa1c93345a585974a97abbdeab9176c77922f3efc1e8"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.23.1",
|
||||
"nix",
|
||||
"regex",
|
||||
]
|
||||
|
||||
@ -174,6 +174,28 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"containerd-shim-protos",
|
||||
"kata-sys-util",
|
||||
"kata-types",
|
||||
"lazy_static",
|
||||
"nix",
|
||||
"oci",
|
||||
"protobuf",
|
||||
"serde_json",
|
||||
"slog",
|
||||
"slog-scope",
|
||||
"strum",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"ttrpc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-path"
|
||||
version = "1.0.0"
|
||||
@ -476,6 +498,12 @@ dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.1.19"
|
||||
@ -559,7 +587,7 @@ dependencies = [
|
||||
"kata-types",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"nix 0.23.1",
|
||||
"nix",
|
||||
"oci",
|
||||
"once_cell",
|
||||
"serde_json",
|
||||
@ -622,6 +650,16 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linux_container"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"common",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.6"
|
||||
@ -711,19 +749,6 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.16.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd0eaf8df8bab402257e0a5c17a254e4cc1f72a93588a1ddfb5d356c801aa7cb"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cc",
|
||||
"cfg-if 0.1.10",
|
||||
"libc",
|
||||
"void",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.23.1"
|
||||
@ -930,7 +955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603"
|
||||
dependencies = [
|
||||
"bytes 1.1.0",
|
||||
"heck",
|
||||
"heck 0.3.3",
|
||||
"itertools",
|
||||
"log",
|
||||
"multimap",
|
||||
@ -1078,6 +1103,24 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "runtimes"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"common",
|
||||
"kata-types",
|
||||
"lazy_static",
|
||||
"linux_container",
|
||||
"logging",
|
||||
"oci",
|
||||
"slog",
|
||||
"slog-scope",
|
||||
"tokio",
|
||||
"virt_container",
|
||||
"wasm_container",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.21"
|
||||
@ -1170,6 +1213,22 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "service"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"common",
|
||||
"containerd-shim-protos",
|
||||
"logging",
|
||||
"runtimes",
|
||||
"slog",
|
||||
"slog-scope",
|
||||
"tokio",
|
||||
"ttrpc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.9.3"
|
||||
@ -1196,10 +1255,12 @@ dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"logging",
|
||||
"nix 0.16.1",
|
||||
"nix",
|
||||
"oci",
|
||||
"protobuf",
|
||||
"rand",
|
||||
"serial_test",
|
||||
"service",
|
||||
"sha2",
|
||||
"slog",
|
||||
"slog-async",
|
||||
@ -1213,6 +1274,15 @@ dependencies = [
|
||||
"vergen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.5"
|
||||
@ -1287,6 +1357,28 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strum"
|
||||
version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8"
|
||||
dependencies = [
|
||||
"strum_macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strum_macros"
|
||||
version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef"
|
||||
dependencies = [
|
||||
"heck 0.4.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustversion",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "subprocess"
|
||||
version = "0.2.8"
|
||||
@ -1411,7 +1503,9 @@ dependencies = [
|
||||
"memchr",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"winapi",
|
||||
@ -1461,7 +1555,7 @@ dependencies = [
|
||||
"futures 0.3.21",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.23.1",
|
||||
"nix",
|
||||
"protobuf",
|
||||
"protobuf-codegen-pure",
|
||||
"thiserror",
|
||||
@ -1580,10 +1674,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||
|
||||
[[package]]
|
||||
name = "void"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
|
||||
name = "virt_container"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"common",
|
||||
"kata-types",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vsock"
|
||||
@ -1592,7 +1691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e32675ee2b3ce5df274c0ab52d19b28789632406277ca26bffee79a8e27dc133"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"nix 0.23.1",
|
||||
"nix",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1607,6 +1706,16 @@ version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "wasm_container"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"common",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "4.2.5"
|
||||
|
@ -120,12 +120,12 @@ fn parse(address: &str, port: u32) -> Result<SockType> {
|
||||
let url = Url::parse(address).context("parse url")?;
|
||||
match url.scheme() {
|
||||
VSOCK_SCHEME => {
|
||||
let cid = url
|
||||
let vsock_cid = url
|
||||
.host_str()
|
||||
.unwrap_or_default()
|
||||
.parse::<u32>()
|
||||
.context("parse cid")?;
|
||||
Ok(SockType::Vsock(Vsock::new(cid, port)))
|
||||
.context("parse vsock cid")?;
|
||||
Ok(SockType::Vsock(Vsock::new(vsock_cid, port)))
|
||||
}
|
||||
HYBRID_VSOCK_SCHEME => {
|
||||
let path: Vec<&str> = url.path().split(':').collect();
|
||||
|
@ -14,13 +14,13 @@ unsafe impl Sync for Vsock {}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct Vsock {
|
||||
cid: u32,
|
||||
vsock_cid: u32,
|
||||
port: u32,
|
||||
}
|
||||
|
||||
impl Vsock {
|
||||
pub fn new(cid: u32, port: u32) -> Self {
|
||||
Self { cid, port }
|
||||
pub fn new(vsock_cid: u32, port: u32) -> Self {
|
||||
Self { vsock_cid, port }
|
||||
}
|
||||
}
|
||||
|
||||
|
28
src/runtime-rs/crates/runtimes/Cargo.toml
Normal file
28
src/runtime-rs/crates/runtimes/Cargo.toml
Normal file
@ -0,0 +1,28 @@
|
||||
[package]
|
||||
name = "runtimes"
|
||||
version = "0.1.0"
|
||||
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
lazy_static = "1.4.0"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
|
||||
|
||||
common = { path = "./common" }
|
||||
kata-types = { path = "../../../libs/kata-types" }
|
||||
logging = { path = "../../../libs/logging"}
|
||||
oci = { path = "../../../libs/oci" }
|
||||
|
||||
# runtime handler
|
||||
linux_container = { path = "./linux_container", optional = true }
|
||||
virt_container = { path = "./virt_container", optional = true }
|
||||
wasm_container = { path = "./wasm_container", optional = true }
|
||||
|
||||
[features]
|
||||
default = ["virt"]
|
||||
linux = ["linux_container"]
|
||||
virt = ["virt_container"]
|
||||
wasm = ["wasm_container"]
|
26
src/runtime-rs/crates/runtimes/common/Cargo.toml
Normal file
26
src/runtime-rs/crates/runtimes/common/Cargo.toml
Normal file
@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "common"
|
||||
version = "0.1.0"
|
||||
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
async-trait = "0.1.48"
|
||||
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
|
||||
lazy_static = "1.4.0"
|
||||
nix = "0.23.1"
|
||||
protobuf = "2.23.0"
|
||||
serde_json = "1.0.39"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
strum = { version = "0.24.0", features = ["derive"] }
|
||||
thiserror = "^1.0"
|
||||
tokio = { version = "1.8.0", features = ["rt-multi-thread", "process", "fs"] }
|
||||
ttrpc = { version = "0.6.0" }
|
||||
|
||||
kata-sys-util = { path = "../../../../libs/kata-sys-util" }
|
||||
kata-types = { path = "../../../../libs/kata-types" }
|
||||
oci = { path = "../../../../libs/oci" }
|
@ -0,0 +1,40 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::types::{
|
||||
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
|
||||
ProcessExitStatus, ProcessStateInfo, ResizePTYRequest, ShutdownRequest, StatsInfo,
|
||||
UpdateRequest, PID,
|
||||
};
|
||||
|
||||
#[async_trait]
|
||||
pub trait ContainerManager: Send + Sync {
|
||||
// container lifecycle
|
||||
async fn create_container(&self, config: ContainerConfig) -> Result<PID>;
|
||||
async fn pause_container(&self, container_id: &ContainerID) -> Result<()>;
|
||||
async fn resume_container(&self, container_id: &ContainerID) -> Result<()>;
|
||||
async fn stats_container(&self, container_id: &ContainerID) -> Result<StatsInfo>;
|
||||
async fn update_container(&self, req: UpdateRequest) -> Result<()>;
|
||||
async fn connect_container(&self, container_id: &ContainerID) -> Result<PID>;
|
||||
|
||||
// process lifecycle
|
||||
async fn close_process_io(&self, process_id: &ContainerProcess) -> Result<()>;
|
||||
async fn delete_process(&self, process_id: &ContainerProcess) -> Result<ProcessStateInfo>;
|
||||
async fn exec_process(&self, req: ExecProcessRequest) -> Result<()>;
|
||||
async fn kill_process(&self, req: &KillRequest) -> Result<()>;
|
||||
async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()>;
|
||||
async fn start_process(&self, process_id: &ContainerProcess) -> Result<PID>;
|
||||
async fn state_process(&self, process_id: &ContainerProcess) -> Result<ProcessStateInfo>;
|
||||
async fn wait_process(&self, process_id: &ContainerProcess) -> Result<ProcessExitStatus>;
|
||||
|
||||
// utility
|
||||
async fn pid(&self) -> Result<PID>;
|
||||
async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool;
|
||||
async fn is_sandbox_container(&self, process_id: &ContainerProcess) -> bool;
|
||||
}
|
17
src/runtime-rs/crates/runtimes/common/src/error.rs
Normal file
17
src/runtime-rs/crates/runtimes/common/src/error.rs
Normal file
@ -0,0 +1,17 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use crate::types::{ContainerProcess, Response};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("failed to find container {0}")]
|
||||
ContainerNotFound(String),
|
||||
#[error("failed to find process {0}")]
|
||||
ProcessNotFound(ContainerProcess),
|
||||
#[error("unexpected response {0} to shim {1}")]
|
||||
UnexpectedResponse(Response, String),
|
||||
}
|
15
src/runtime-rs/crates/runtimes/common/src/lib.rs
Normal file
15
src/runtime-rs/crates/runtimes/common/src/lib.rs
Normal file
@ -0,0 +1,15 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
mod container_manager;
|
||||
pub use container_manager::ContainerManager;
|
||||
pub mod error;
|
||||
pub mod message;
|
||||
mod runtime_handler;
|
||||
pub use runtime_handler::{RuntimeHandler, RuntimeInstance};
|
||||
mod sandbox;
|
||||
pub use sandbox::Sandbox;
|
||||
pub mod types;
|
44
src/runtime-rs/crates/runtimes/common/src/message.rs
Normal file
44
src/runtime-rs/crates/runtimes/common/src/message.rs
Normal file
@ -0,0 +1,44 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use anyhow::Result;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
|
||||
/// message receiver buffer size
|
||||
const MESSAGE_RECEIVER_BUFFER_SIZE: usize = 1;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Action {
|
||||
Start,
|
||||
Stop,
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Message {
|
||||
pub action: Action,
|
||||
pub resp_sender: Option<Sender<Result<()>>>,
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub fn new(action: Action) -> Self {
|
||||
Message {
|
||||
action,
|
||||
resp_sender: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_receiver(action: Action) -> (Receiver<Result<()>>, Self) {
|
||||
let (resp_sender, receiver) = channel(MESSAGE_RECEIVER_BUFFER_SIZE);
|
||||
(
|
||||
receiver,
|
||||
Message {
|
||||
action,
|
||||
resp_sender: Some(resp_sender),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
38
src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs
Normal file
38
src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs
Normal file
@ -0,0 +1,38 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::{message::Message, ContainerManager, Sandbox};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RuntimeInstance {
|
||||
pub sandbox: Arc<dyn Sandbox>,
|
||||
pub container_manager: Arc<dyn ContainerManager>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait RuntimeHandler: Send + Sync {
|
||||
fn init() -> Result<()>
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
fn name() -> String
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
fn new_handler() -> Arc<dyn RuntimeHandler>
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
async fn new_instance(&self, sid: &str, msg_sender: Sender<Message>)
|
||||
-> Result<RuntimeInstance>;
|
||||
|
||||
fn cleanup(&self, id: &str) -> Result<()>;
|
||||
}
|
16
src/runtime-rs/crates/runtimes/common/src/sandbox.rs
Normal file
16
src/runtime-rs/crates/runtimes/common/src/sandbox.rs
Normal file
@ -0,0 +1,16 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Sandbox: Send + Sync {
|
||||
async fn start(&self) -> Result<()>;
|
||||
async fn stop(&self) -> Result<()>;
|
||||
async fn cleanup(&self, container_id: &str) -> Result<()>;
|
||||
async fn shutdown(&self) -> Result<()>;
|
||||
}
|
219
src/runtime-rs/crates/runtimes/common/src/types/mod.rs
Normal file
219
src/runtime-rs/crates/runtimes/common/src/types/mod.rs
Normal file
@ -0,0 +1,219 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
mod trans_from_shim;
|
||||
mod trans_into_shim;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use kata_sys_util::validate;
|
||||
use kata_types::mount::Mount;
|
||||
use strum::Display;
|
||||
|
||||
/// Request: request from shim
|
||||
/// Request and Response messages need to be paired
|
||||
#[derive(Debug, Clone, Display)]
|
||||
pub enum Request {
|
||||
CreateContainer(ContainerConfig),
|
||||
CloseProcessIO(ContainerProcess),
|
||||
DeleteProcess(ContainerProcess),
|
||||
ExecProcess(ExecProcessRequest),
|
||||
KillProcess(KillRequest),
|
||||
WaitProcess(ContainerProcess),
|
||||
StartProcess(ContainerProcess),
|
||||
StateProcess(ContainerProcess),
|
||||
ShutdownContainer(ShutdownRequest),
|
||||
PauseContainer(ContainerID),
|
||||
ResumeContainer(ContainerID),
|
||||
ResizeProcessPTY(ResizePTYRequest),
|
||||
StatsContainer(ContainerID),
|
||||
UpdateContainer(UpdateRequest),
|
||||
Pid,
|
||||
ConnectContainer(ContainerID),
|
||||
}
|
||||
|
||||
/// Response: response to shim
|
||||
/// Request and Response messages need to be paired
|
||||
#[derive(Debug, Clone, Display)]
|
||||
pub enum Response {
|
||||
CreateContainer(PID),
|
||||
CloseProcessIO,
|
||||
DeleteProcess(ProcessStateInfo),
|
||||
ExecProcess,
|
||||
KillProcess,
|
||||
WaitProcess(ProcessExitStatus),
|
||||
StartProcess(PID),
|
||||
StateProcess(ProcessStateInfo),
|
||||
ShutdownContainer,
|
||||
PauseContainer,
|
||||
ResumeContainer,
|
||||
ResizeProcessPTY,
|
||||
StatsContainer(StatsInfo),
|
||||
UpdateContainer,
|
||||
Pid(PID),
|
||||
ConnectContainer(PID),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub enum ProcessType {
|
||||
Container,
|
||||
Exec,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ContainerID {
|
||||
pub container_id: String,
|
||||
}
|
||||
|
||||
impl ContainerID {
|
||||
pub fn new(container_id: &str) -> Result<Self> {
|
||||
validate::verify_id(container_id).context("verify container id")?;
|
||||
Ok(Self {
|
||||
container_id: container_id.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ContainerProcess {
|
||||
pub container_id: ContainerID,
|
||||
pub exec_id: String,
|
||||
pub process_type: ProcessType,
|
||||
}
|
||||
|
||||
impl fmt::Display for ContainerProcess {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{:?}", &self)
|
||||
}
|
||||
}
|
||||
|
||||
impl ContainerProcess {
|
||||
pub fn new(container_id: &str, exec_id: &str) -> Result<Self> {
|
||||
let (exec_id, process_type) = if exec_id.is_empty() || container_id == exec_id {
|
||||
("".to_string(), ProcessType::Container)
|
||||
} else {
|
||||
validate::verify_id(exec_id).context("verify exec id")?;
|
||||
(exec_id.to_string(), ProcessType::Exec)
|
||||
};
|
||||
Ok(Self {
|
||||
container_id: ContainerID::new(container_id)?,
|
||||
exec_id,
|
||||
process_type,
|
||||
})
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ContainerConfig {
|
||||
pub container_id: String,
|
||||
pub bundle: String,
|
||||
pub rootfs_mounts: Vec<Mount>,
|
||||
pub terminal: bool,
|
||||
pub stdin: Option<String>,
|
||||
pub stdout: Option<String>,
|
||||
pub stderr: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PID {
|
||||
pub pid: u32,
|
||||
}
|
||||
|
||||
impl PID {
|
||||
pub fn new(pid: u32) -> Self {
|
||||
Self { pid }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KillRequest {
|
||||
pub process_id: ContainerProcess,
|
||||
pub signal: u32,
|
||||
pub all: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ShutdownRequest {
|
||||
pub container_id: String,
|
||||
pub is_now: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ResizePTYRequest {
|
||||
pub process_id: ContainerProcess,
|
||||
pub width: u32,
|
||||
pub height: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ExecProcessRequest {
|
||||
pub process_id: ContainerProcess,
|
||||
pub terminal: bool,
|
||||
pub stdin: Option<String>,
|
||||
pub stdout: Option<String>,
|
||||
pub stderr: Option<String>,
|
||||
pub spec_type_url: String,
|
||||
pub spec_value: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Debug)]
|
||||
pub enum ProcessStatus {
|
||||
Unknown = 0,
|
||||
Created = 1,
|
||||
Running = 2,
|
||||
Stopped = 3,
|
||||
Paused = 4,
|
||||
Pausing = 5,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProcessStateInfo {
|
||||
pub container_id: String,
|
||||
pub exec_id: String,
|
||||
pub pid: PID,
|
||||
pub bundle: String,
|
||||
pub stdin: Option<String>,
|
||||
pub stdout: Option<String>,
|
||||
pub stderr: Option<String>,
|
||||
pub terminal: bool,
|
||||
pub status: ProcessStatus,
|
||||
pub exit_status: i32,
|
||||
pub exited_at: Option<std::time::SystemTime>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ProcessExitStatus {
|
||||
pub exit_code: i32,
|
||||
pub exit_time: Option<std::time::SystemTime>,
|
||||
}
|
||||
|
||||
impl ProcessExitStatus {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn update_exit_code(&mut self, exit_code: i32) {
|
||||
self.exit_code = exit_code;
|
||||
self.exit_time = Some(std::time::SystemTime::now());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StatsInfoValue {
|
||||
pub type_url: String,
|
||||
pub value: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StatsInfo {
|
||||
pub value: Option<StatsInfoValue>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpdateRequest {
|
||||
pub container_id: String,
|
||||
pub value: Vec<u8>,
|
||||
}
|
@ -0,0 +1,198 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
convert::{From, TryFrom},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use containerd_shim_protos::api;
|
||||
use kata_types::mount::Mount;
|
||||
|
||||
use super::{
|
||||
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request,
|
||||
ResizePTYRequest, ShutdownRequest, UpdateRequest,
|
||||
};
|
||||
|
||||
fn trans_from_shim_mount(from: api::Mount) -> Mount {
|
||||
let options = from.options.to_vec();
|
||||
let mut read_only = false;
|
||||
for o in &options {
|
||||
if o == "ro" {
|
||||
read_only = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Mount {
|
||||
source: from.source.clone(),
|
||||
destination: PathBuf::from(&from.target),
|
||||
fs_type: from.field_type,
|
||||
options,
|
||||
device_id: None,
|
||||
host_shared_fs_path: None,
|
||||
read_only,
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::CreateTaskRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::CreateTaskRequest) -> Result<Self> {
|
||||
Ok(Request::CreateContainer(ContainerConfig {
|
||||
container_id: from.id.clone(),
|
||||
bundle: from.bundle.clone(),
|
||||
rootfs_mounts: from
|
||||
.rootfs
|
||||
.to_vec()
|
||||
.into_iter()
|
||||
.map(trans_from_shim_mount)
|
||||
.collect(),
|
||||
terminal: from.terminal,
|
||||
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
|
||||
stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()),
|
||||
stderr: (!from.stderr.is_empty()).then(|| from.stderr.clone()),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::CloseIORequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::CloseIORequest) -> Result<Self> {
|
||||
Ok(Request::CloseProcessIO(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::DeleteRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::DeleteRequest) -> Result<Self> {
|
||||
Ok(Request::DeleteProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ExecProcessRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ExecProcessRequest) -> Result<Self> {
|
||||
let spec = from.get_spec();
|
||||
Ok(Request::ExecProcess(ExecProcessRequest {
|
||||
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
terminal: from.terminal,
|
||||
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
|
||||
stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()),
|
||||
stderr: (!from.stderr.is_empty()).then(|| from.stderr.clone()),
|
||||
spec_type_url: spec.get_type_url().to_string(),
|
||||
spec_value: spec.get_value().to_vec(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::KillRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::KillRequest) -> Result<Self> {
|
||||
Ok(Request::KillProcess(KillRequest {
|
||||
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
signal: from.signal,
|
||||
all: from.all,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::WaitRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::WaitRequest) -> Result<Self> {
|
||||
Ok(Request::WaitProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::StartRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::StartRequest) -> Result<Self> {
|
||||
Ok(Request::StartProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::StateRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::StateRequest) -> Result<Self> {
|
||||
Ok(Request::StateProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ShutdownRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ShutdownRequest) -> Result<Self> {
|
||||
Ok(Request::ShutdownContainer(ShutdownRequest {
|
||||
container_id: from.id.to_string(),
|
||||
is_now: from.now,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ResizePtyRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ResizePtyRequest) -> Result<Self> {
|
||||
Ok(Request::ResizeProcessPTY(ResizePTYRequest {
|
||||
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
width: from.width,
|
||||
height: from.height,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::PauseRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::PauseRequest) -> Result<Self> {
|
||||
Ok(Request::PauseContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ResumeRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ResumeRequest) -> Result<Self> {
|
||||
Ok(Request::ResumeContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::StatsRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::StatsRequest) -> Result<Self> {
|
||||
Ok(Request::StatsContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::UpdateTaskRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::UpdateTaskRequest) -> Result<Self> {
|
||||
Ok(Request::UpdateContainer(UpdateRequest {
|
||||
container_id: from.id.to_string(),
|
||||
value: from.get_resources().get_value().to_vec(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::PidsRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(_from: api::PidsRequest) -> Result<Self> {
|
||||
Ok(Request::Pid)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ConnectRequest> for Request {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ConnectRequest) -> Result<Self> {
|
||||
Ok(Request::ConnectContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
@ -0,0 +1,242 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
any::type_name,
|
||||
convert::{Into, TryFrom, TryInto},
|
||||
time,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use containerd_shim_protos::api;
|
||||
|
||||
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response};
|
||||
use crate::error::Error;
|
||||
|
||||
fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::Timestamp {
|
||||
let mut proto_time = ::protobuf::well_known_types::Timestamp::new();
|
||||
proto_time.set_seconds(
|
||||
time.duration_since(time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
.try_into()
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
proto_time
|
||||
}
|
||||
|
||||
fn option_system_time_into(
|
||||
time: Option<time::SystemTime>,
|
||||
) -> ::protobuf::SingularPtrField<::protobuf::well_known_types::Timestamp> {
|
||||
match time {
|
||||
Some(v) => ::protobuf::SingularPtrField::some(system_time_into(v)),
|
||||
None => ::protobuf::SingularPtrField::none(),
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProcessExitStatus> for api::WaitResponse {
|
||||
fn from(from: ProcessExitStatus) -> Self {
|
||||
Self {
|
||||
exit_status: from.exit_code as u32,
|
||||
exited_at: option_system_time_into(from.exit_time),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProcessStatus> for api::Status {
|
||||
fn from(from: ProcessStatus) -> Self {
|
||||
match from {
|
||||
ProcessStatus::Unknown => api::Status::UNKNOWN,
|
||||
ProcessStatus::Created => api::Status::CREATED,
|
||||
ProcessStatus::Running => api::Status::RUNNING,
|
||||
ProcessStatus::Stopped => api::Status::STOPPED,
|
||||
ProcessStatus::Paused => api::Status::PAUSED,
|
||||
ProcessStatus::Pausing => api::Status::PAUSING,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl From<ProcessStateInfo> for api::StateResponse {
|
||||
fn from(from: ProcessStateInfo) -> Self {
|
||||
Self {
|
||||
id: from.container_id.clone(),
|
||||
bundle: from.bundle.clone(),
|
||||
pid: from.pid.pid,
|
||||
status: from.status.into(),
|
||||
stdin: from.stdin.unwrap_or_default(),
|
||||
stdout: from.stdout.unwrap_or_default(),
|
||||
stderr: from.stderr.unwrap_or_default(),
|
||||
terminal: from.terminal,
|
||||
exit_status: from.exit_status as u32,
|
||||
exited_at: option_system_time_into(from.exited_at),
|
||||
exec_id: from.exec_id,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProcessStateInfo> for api::DeleteResponse {
|
||||
fn from(from: ProcessStateInfo) -> Self {
|
||||
Self {
|
||||
pid: from.pid.pid,
|
||||
exit_status: from.exit_status as u32,
|
||||
exited_at: option_system_time_into(from.exited_at),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::CreateTaskResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::CreateContainer(resp) => Ok(Self {
|
||||
pid: resp.pid,
|
||||
..Default::default()
|
||||
}),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::DeleteResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::DeleteProcess(resp) => Ok(resp.into()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::WaitResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::WaitProcess(resp) => Ok(resp.into()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::StartResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::StartProcess(resp) => Ok(api::StartResponse {
|
||||
pid: resp.pid,
|
||||
..Default::default()
|
||||
}),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::StateResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::StateProcess(resp) => Ok(resp.into()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::StatsResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
let mut any = ::protobuf::well_known_types::Any::new();
|
||||
let mut response = api::StatsResponse::new();
|
||||
match from {
|
||||
Response::StatsContainer(resp) => {
|
||||
if let Some(value) = resp.value {
|
||||
any.set_type_url(value.type_url);
|
||||
any.set_value(value.value);
|
||||
response.set_stats(any);
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::PidsResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::Pid(resp) => {
|
||||
let mut processes: Vec<api::ProcessInfo> = vec![];
|
||||
let mut p_info = api::ProcessInfo::new();
|
||||
let mut res = api::PidsResponse::new();
|
||||
p_info.set_pid(resp.pid);
|
||||
processes.push(p_info);
|
||||
let v = protobuf::RepeatedField::<api::ProcessInfo>::from_vec(processes);
|
||||
res.set_processes(v);
|
||||
Ok(res)
|
||||
}
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::ConnectResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::ConnectContainer(resp) => {
|
||||
let mut res = api::ConnectResponse::new();
|
||||
res.set_shim_pid(resp.pid);
|
||||
Ok(res)
|
||||
}
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::Empty {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
match from {
|
||||
Response::CloseProcessIO => Ok(api::Empty::new()),
|
||||
Response::ExecProcess => Ok(api::Empty::new()),
|
||||
Response::KillProcess => Ok(api::Empty::new()),
|
||||
Response::ShutdownContainer => Ok(api::Empty::new()),
|
||||
Response::PauseContainer => Ok(api::Empty::new()),
|
||||
Response::ResumeContainer => Ok(api::Empty::new()),
|
||||
Response::ResizeProcessPTY => Ok(api::Empty::new()),
|
||||
Response::UpdateContainer => Ok(api::Empty::new()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
12
src/runtime-rs/crates/runtimes/linux_container/Cargo.toml
Normal file
12
src/runtime-rs/crates/runtimes/linux_container/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "linux_container"
|
||||
version = "0.1.0"
|
||||
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
async-trait = "0.1.48"
|
||||
tokio = { version = "1.8.0" }
|
||||
|
||||
common = { path = "../common" }
|
42
src/runtime-rs/crates/runtimes/linux_container/src/lib.rs
Normal file
42
src/runtime-rs/crates/runtimes/linux_container/src/lib.rs
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use common::{message::Message, RuntimeHandler, RuntimeInstance};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
unsafe impl Send for LinuxContainer {}
|
||||
unsafe impl Sync for LinuxContainer {}
|
||||
pub struct LinuxContainer {}
|
||||
|
||||
#[async_trait]
|
||||
impl RuntimeHandler for LinuxContainer {
|
||||
fn init() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name() -> String {
|
||||
"linux_container".to_string()
|
||||
}
|
||||
|
||||
fn new_handler() -> Arc<dyn RuntimeHandler> {
|
||||
Arc::new(LinuxContainer {})
|
||||
}
|
||||
|
||||
async fn new_instance(
|
||||
&self,
|
||||
_sid: &str,
|
||||
_msg_sender: Sender<Message>,
|
||||
) -> Result<RuntimeInstance> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn cleanup(&self, _id: &str) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
13
src/runtime-rs/crates/runtimes/src/lib.rs
Normal file
13
src/runtime-rs/crates/runtimes/src/lib.rs
Normal file
@ -0,0 +1,13 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
#[macro_use]
|
||||
extern crate slog;
|
||||
|
||||
logging::logger_with_subsystem!(sl, "runtimes");
|
||||
|
||||
mod manager;
|
||||
pub use manager::RuntimeHandlerManager;
|
251
src/runtime-rs/crates/runtimes/src/manager.rs
Normal file
251
src/runtime-rs/crates/runtimes/src/manager.rs
Normal file
@ -0,0 +1,251 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use common::{
|
||||
message::Message,
|
||||
types::{Request, Response},
|
||||
RuntimeHandler, RuntimeInstance,
|
||||
};
|
||||
use kata_types::{annotations::Annotation, config::TomlConfig};
|
||||
use tokio::sync::{mpsc::Sender, RwLock};
|
||||
|
||||
#[cfg(feature = "linux")]
|
||||
use linux_container::LinuxContainer;
|
||||
#[cfg(feature = "virt")]
|
||||
use virt_container::VirtContainer;
|
||||
#[cfg(feature = "wasm")]
|
||||
use wasm_container::WasmContainer;
|
||||
|
||||
struct RuntimeHandlerManagerInner {
|
||||
id: String,
|
||||
msg_sender: Sender<Message>,
|
||||
runtime_instance: Option<RuntimeInstance>,
|
||||
}
|
||||
|
||||
impl RuntimeHandlerManagerInner {
|
||||
fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
id: id.to_string(),
|
||||
msg_sender,
|
||||
runtime_instance: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn init_runtime_handler(&mut self, runtime_name: &str) -> Result<()> {
|
||||
info!(sl!(), "new runtime handler {}", runtime_name);
|
||||
|
||||
let runtime_handler = match runtime_name {
|
||||
#[cfg(feature = "linux")]
|
||||
name if name == LinuxContainer::name() => {
|
||||
LinuxContainer::init().context("init linux container")?;
|
||||
LinuxContainer::new_handler()
|
||||
}
|
||||
#[cfg(feature = "wasm")]
|
||||
name if name == WasmContainer::name() => {
|
||||
WasmContainer::init().context("init wasm container")?;
|
||||
WasmContainer::new_handler()
|
||||
}
|
||||
#[cfg(feature = "virt")]
|
||||
name if name == VirtContainer::name() => {
|
||||
VirtContainer::init().context("init virt container")?;
|
||||
VirtContainer::new_handler()
|
||||
}
|
||||
_ => return Err(anyhow!("Unsupported runtime: {}", runtime_name)),
|
||||
};
|
||||
let runtime_instance = runtime_handler
|
||||
.new_instance(&self.id, self.msg_sender.clone())
|
||||
.await
|
||||
.context("new runtime instance")?;
|
||||
|
||||
// start sandbox
|
||||
runtime_instance
|
||||
.sandbox
|
||||
.start()
|
||||
.await
|
||||
.context("start sandbox")?;
|
||||
self.runtime_instance = Some(runtime_instance);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn try_init(&mut self, spec: &oci::Spec) -> Result<()> {
|
||||
// return if runtime instance has init
|
||||
if self.runtime_instance.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let config = load_config(spec).context("load config")?;
|
||||
self.init_runtime_handler(&config.runtime.name)
|
||||
.await
|
||||
.context("init runtime handler")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_runtime_instance(&self) -> Option<RuntimeInstance> {
|
||||
self.runtime_instance.clone()
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for RuntimeHandlerManager {}
|
||||
unsafe impl Sync for RuntimeHandlerManager {}
|
||||
pub struct RuntimeHandlerManager {
|
||||
inner: Arc<RwLock<RuntimeHandlerManagerInner>>,
|
||||
}
|
||||
|
||||
impl RuntimeHandlerManager {
|
||||
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
|
||||
id, msg_sender,
|
||||
)?)),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn cleanup(_id: &str) -> Result<()> {
|
||||
// TODO: load runtime from persist and cleanup
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handler_message(&self, req: Request) -> Result<Response> {
|
||||
if let Request::CreateContainer(req) = req {
|
||||
// get oci spec
|
||||
let bundler_path = format!("{}/{}", req.bundle, oci::OCI_SPEC_CONFIG_FILE_NAME);
|
||||
let spec = oci::Spec::load(&bundler_path).context("load spec")?;
|
||||
|
||||
let mut inner = self.inner.write().await;
|
||||
inner
|
||||
.try_init(&spec)
|
||||
.await
|
||||
.context("try init runtime handler")?;
|
||||
|
||||
let instance = inner
|
||||
.get_runtime_instance()
|
||||
.ok_or_else(|| anyhow!("runtime not ready"))?;
|
||||
|
||||
let shim_pid = instance
|
||||
.container_manager
|
||||
.create_container(req)
|
||||
.await
|
||||
.context("create container")?;
|
||||
Ok(Response::CreateContainer(shim_pid))
|
||||
} else {
|
||||
self.handler_request(req).await.context("handler request")
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handler_request(&self, req: Request) -> Result<Response> {
|
||||
let inner = self.inner.read().await;
|
||||
let instance = inner
|
||||
.get_runtime_instance()
|
||||
.ok_or_else(|| anyhow!("runtime not ready"))?;
|
||||
let sandbox = instance.sandbox;
|
||||
let cm = instance.container_manager;
|
||||
|
||||
match req {
|
||||
Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)),
|
||||
Request::CloseProcessIO(process_id) => {
|
||||
cm.close_process_io(&process_id).await.context("close io")?;
|
||||
Ok(Response::CloseProcessIO)
|
||||
}
|
||||
Request::DeleteProcess(process_id) => {
|
||||
let resp = cm.delete_process(&process_id).await.context("do delete")?;
|
||||
Ok(Response::DeleteProcess(resp))
|
||||
}
|
||||
Request::ExecProcess(req) => {
|
||||
cm.exec_process(req).await.context("exec")?;
|
||||
Ok(Response::ExecProcess)
|
||||
}
|
||||
Request::KillProcess(req) => {
|
||||
cm.kill_process(&req).await.context("kill process")?;
|
||||
Ok(Response::KillProcess)
|
||||
}
|
||||
Request::ShutdownContainer(req) => {
|
||||
if cm.need_shutdown_sandbox(&req).await {
|
||||
sandbox.shutdown().await.context("do shutdown")?;
|
||||
}
|
||||
Ok(Response::ShutdownContainer)
|
||||
}
|
||||
Request::WaitProcess(process_id) => {
|
||||
let exit_status = cm.wait_process(&process_id).await.context("wait process")?;
|
||||
if cm.is_sandbox_container(&process_id).await {
|
||||
sandbox.stop().await.context("stop sandbox")?;
|
||||
}
|
||||
Ok(Response::WaitProcess(exit_status))
|
||||
}
|
||||
Request::StartProcess(process_id) => {
|
||||
let shim_pid = cm
|
||||
.start_process(&process_id)
|
||||
.await
|
||||
.context("start process")?;
|
||||
Ok(Response::StartProcess(shim_pid))
|
||||
}
|
||||
|
||||
Request::StateProcess(process_id) => {
|
||||
let state = cm
|
||||
.state_process(&process_id)
|
||||
.await
|
||||
.context("state process")?;
|
||||
Ok(Response::StateProcess(state))
|
||||
}
|
||||
Request::PauseContainer(container_id) => {
|
||||
cm.pause_container(&container_id)
|
||||
.await
|
||||
.context("pause container")?;
|
||||
Ok(Response::PauseContainer)
|
||||
}
|
||||
Request::ResumeContainer(container_id) => {
|
||||
cm.resume_container(&container_id)
|
||||
.await
|
||||
.context("resume container")?;
|
||||
Ok(Response::ResumeContainer)
|
||||
}
|
||||
Request::ResizeProcessPTY(req) => {
|
||||
cm.resize_process_pty(&req).await.context("resize pty")?;
|
||||
Ok(Response::ResizeProcessPTY)
|
||||
}
|
||||
Request::StatsContainer(container_id) => {
|
||||
let stats = cm
|
||||
.stats_container(&container_id)
|
||||
.await
|
||||
.context("stats container")?;
|
||||
Ok(Response::StatsContainer(stats))
|
||||
}
|
||||
Request::UpdateContainer(req) => {
|
||||
cm.update_container(req).await.context("update container")?;
|
||||
Ok(Response::UpdateContainer)
|
||||
}
|
||||
Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)),
|
||||
Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer(
|
||||
cm.connect_container(&container_id)
|
||||
.await
|
||||
.context("connect")?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Config override ordering(high to low):
|
||||
/// 1. podsandbox annotation
|
||||
/// 2. shimv2 create task option
|
||||
/// TODO: https://github.com/kata-containers/kata-containers/issues/3961
|
||||
/// 3. environment
|
||||
fn load_config(spec: &oci::Spec) -> Result<TomlConfig> {
|
||||
const KATA_CONF_FILE: &str = "KATA_CONF_FILE";
|
||||
let annotation = Annotation::new(spec.annotations.clone());
|
||||
let config_path = if let Some(path) = annotation.get_sandbox_config_path() {
|
||||
path
|
||||
} else if let Ok(path) = std::env::var(KATA_CONF_FILE) {
|
||||
path
|
||||
} else {
|
||||
String::from("")
|
||||
};
|
||||
info!(sl!(), "get config path {:?}", &config_path);
|
||||
let (toml_config, _) = TomlConfig::load_from_file(&config_path).context("load toml config")?;
|
||||
Ok(toml_config)
|
||||
}
|
13
src/runtime-rs/crates/runtimes/virt_container/Cargo.toml
Normal file
13
src/runtime-rs/crates/runtimes/virt_container/Cargo.toml
Normal file
@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "virt_container"
|
||||
version = "0.1.0"
|
||||
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
async-trait = "0.1.48"
|
||||
tokio = { version = "1.8.0" }
|
||||
|
||||
common = { path = "../common" }
|
||||
kata-types = { path = "../../../../libs/kata-types" }
|
47
src/runtime-rs/crates/runtimes/virt_container/src/lib.rs
Normal file
47
src/runtime-rs/crates/runtimes/virt_container/src/lib.rs
Normal file
@ -0,0 +1,47 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use common::{message::Message, RuntimeHandler, RuntimeInstance};
|
||||
use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
unsafe impl Send for VirtContainer {}
|
||||
unsafe impl Sync for VirtContainer {}
|
||||
pub struct VirtContainer {}
|
||||
|
||||
#[async_trait]
|
||||
impl RuntimeHandler for VirtContainer {
|
||||
fn init() -> Result<()> {
|
||||
// register
|
||||
let dragonball_config = Arc::new(DragonballConfig::new());
|
||||
register_hypervisor_plugin("dragonball", dragonball_config);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name() -> String {
|
||||
"virt_container".to_string()
|
||||
}
|
||||
|
||||
fn new_handler() -> Arc<dyn RuntimeHandler> {
|
||||
Arc::new(VirtContainer {})
|
||||
}
|
||||
|
||||
async fn new_instance(
|
||||
&self,
|
||||
_sid: &str,
|
||||
_msg_sender: Sender<Message>,
|
||||
) -> Result<RuntimeInstance> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn cleanup(&self, _id: &str) -> Result<()> {
|
||||
// TODO
|
||||
Ok(())
|
||||
}
|
||||
}
|
12
src/runtime-rs/crates/runtimes/wasm_container/Cargo.toml
Normal file
12
src/runtime-rs/crates/runtimes/wasm_container/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "wasm_container"
|
||||
version = "0.1.0"
|
||||
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
async-trait = "0.1.48"
|
||||
tokio = { version = "1.8.0" }
|
||||
|
||||
common = { path = "../common" }
|
42
src/runtime-rs/crates/runtimes/wasm_container/src/lib.rs
Normal file
42
src/runtime-rs/crates/runtimes/wasm_container/src/lib.rs
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use common::{message::Message, RuntimeHandler, RuntimeInstance};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
unsafe impl Send for WasmContainer {}
|
||||
unsafe impl Sync for WasmContainer {}
|
||||
pub struct WasmContainer {}
|
||||
|
||||
#[async_trait]
|
||||
impl RuntimeHandler for WasmContainer {
|
||||
fn init() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name() -> String {
|
||||
"wasm_container".to_string()
|
||||
}
|
||||
|
||||
fn new_handler() -> Arc<dyn RuntimeHandler> {
|
||||
Arc::new(WasmContainer {})
|
||||
}
|
||||
|
||||
async fn new_instance(
|
||||
&self,
|
||||
_sid: &str,
|
||||
_msg_sender: Sender<Message>,
|
||||
) -> Result<RuntimeInstance> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn cleanup(&self, _id: &str) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
18
src/runtime-rs/crates/service/Cargo.toml
Normal file
18
src/runtime-rs/crates/service/Cargo.toml
Normal file
@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "service"
|
||||
version = "0.1.0"
|
||||
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
async-trait = "0.1.48"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
|
||||
ttrpc = { version = "0.6.0" }
|
||||
|
||||
common = { path = "../runtimes/common" }
|
||||
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
|
||||
logging = { path = "../../../libs/logging"}
|
||||
runtimes = { path = "../runtimes" }
|
14
src/runtime-rs/crates/service/src/lib.rs
Normal file
14
src/runtime-rs/crates/service/src/lib.rs
Normal file
@ -0,0 +1,14 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
#[macro_use]
|
||||
extern crate slog;
|
||||
|
||||
logging::logger_with_subsystem!(sl, "service");
|
||||
|
||||
mod manager;
|
||||
pub use manager::ServiceManager;
|
||||
mod task_service;
|
107
src/runtime-rs/crates/service/src/manager.rs
Normal file
107
src/runtime-rs/crates/service/src/manager.rs
Normal file
@ -0,0 +1,107 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
os::unix::io::{FromRawFd, RawFd},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use common::message::{Action, Message};
|
||||
use containerd_shim_protos::shim_async;
|
||||
use runtimes::RuntimeHandlerManager;
|
||||
use tokio::sync::mpsc::{channel, Receiver};
|
||||
use ttrpc::asynchronous::Server;
|
||||
|
||||
use crate::task_service::TaskService;
|
||||
|
||||
/// message buffer size
|
||||
const MESSAGE_BUFFER_SIZE: usize = 8;
|
||||
|
||||
pub struct ServiceManager {
|
||||
receiver: Option<Receiver<Message>>,
|
||||
handler: Arc<RuntimeHandlerManager>,
|
||||
task_server: Option<Server>,
|
||||
}
|
||||
|
||||
impl ServiceManager {
|
||||
pub async fn new(id: &str, task_server_fd: RawFd) -> Result<Self> {
|
||||
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||
let handler = Arc::new(
|
||||
RuntimeHandlerManager::new(id, sender)
|
||||
.await
|
||||
.context("new runtime handler")?,
|
||||
);
|
||||
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
||||
task_server = task_server.set_domain_unix();
|
||||
Ok(Self {
|
||||
receiver: Some(receiver),
|
||||
handler,
|
||||
task_server: Some(task_server),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
info!(sl!(), "begin to run service");
|
||||
|
||||
self.start().await.context("start")?;
|
||||
let mut rx = self.receiver.take();
|
||||
if let Some(rx) = rx.as_mut() {
|
||||
while let Some(r) = rx.recv().await {
|
||||
info!(sl!(), "receive action {:?}", &r.action);
|
||||
let result = match r.action {
|
||||
Action::Start => self.start().await.context("start listen"),
|
||||
Action::Stop => self.stop_listen().await.context("stop listen"),
|
||||
Action::Shutdown => {
|
||||
self.stop_listen().await.context("stop listen")?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(ref sender) = r.resp_sender {
|
||||
sender.send(result).await.context("send response")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(sl!(), "end to run service");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn cleanup(id: &str) -> Result<()> {
|
||||
RuntimeHandlerManager::cleanup(id)
|
||||
}
|
||||
|
||||
async fn start(&mut self) -> Result<()> {
|
||||
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
||||
as Box<dyn shim_async::Task + Send + Sync>);
|
||||
let task_server = self.task_server.take();
|
||||
let task_server = match task_server {
|
||||
Some(t) => {
|
||||
let mut t = t.register_service(shim_async::create_task(task_service));
|
||||
t.start().await.context("task server start")?;
|
||||
Some(t)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
self.task_server = task_server;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_listen(&mut self) -> Result<()> {
|
||||
let task_server = self.task_server.take();
|
||||
let task_server = match task_server {
|
||||
Some(mut t) => {
|
||||
t.stop_listen().await;
|
||||
Some(t)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
self.task_server = task_server;
|
||||
Ok(())
|
||||
}
|
||||
}
|
82
src/runtime-rs/crates/service/src/task_service.rs
Normal file
82
src/runtime-rs/crates/service/src/task_service.rs
Normal file
@ -0,0 +1,82 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common::types::{Request, Response};
|
||||
use containerd_shim_protos::{api, shim_async};
|
||||
use ttrpc::{self, r#async::TtrpcContext};
|
||||
|
||||
use runtimes::RuntimeHandlerManager;
|
||||
|
||||
pub(crate) struct TaskService {
|
||||
handler: Arc<RuntimeHandlerManager>,
|
||||
}
|
||||
|
||||
impl TaskService {
|
||||
pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self {
|
||||
Self { handler }
|
||||
}
|
||||
}
|
||||
|
||||
async fn handler_message<TtrpcReq, TtrpcResp>(
|
||||
s: &RuntimeHandlerManager,
|
||||
ctx: &TtrpcContext,
|
||||
req: TtrpcReq,
|
||||
) -> ttrpc::Result<TtrpcResp>
|
||||
where
|
||||
Request: TryFrom<TtrpcReq>,
|
||||
<Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
|
||||
TtrpcResp: TryFrom<Response>,
|
||||
<TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug,
|
||||
{
|
||||
let r = req
|
||||
.try_into()
|
||||
.map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?;
|
||||
let logger = sl!().new(o!("steam id" => ctx.mh.stream_id));
|
||||
debug!(logger, "====> task service {:?}", &r);
|
||||
let resp = s
|
||||
.handler_message(r)
|
||||
.await
|
||||
.map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?;
|
||||
debug!(logger, "<==== task service {:?}", &resp);
|
||||
Ok(resp
|
||||
.try_into()
|
||||
.map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))?)
|
||||
}
|
||||
|
||||
macro_rules! impl_service {
|
||||
($($name: tt | $req: ty | $resp: ty),*) => {
|
||||
#[async_trait]
|
||||
impl shim_async::Task for TaskService {
|
||||
$(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> {
|
||||
handler_message(&self.handler, ctx, req).await
|
||||
})*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_service!(
|
||||
state | api::StateRequest | api::StateResponse,
|
||||
create | api::CreateTaskRequest | api::CreateTaskResponse,
|
||||
start | api::StartRequest | api::StartResponse,
|
||||
delete | api::DeleteRequest | api::DeleteResponse,
|
||||
pids | api::PidsRequest | api::PidsResponse,
|
||||
pause | api::PauseRequest | api::Empty,
|
||||
resume | api::ResumeRequest | api::Empty,
|
||||
kill | api::KillRequest | api::Empty,
|
||||
exec | api::ExecProcessRequest | api::Empty,
|
||||
resize_pty | api::ResizePtyRequest | api::Empty,
|
||||
update | api::UpdateTaskRequest | api::Empty,
|
||||
wait | api::WaitRequest | api::WaitResponse,
|
||||
stats | api::StatsRequest | api::StatsResponse,
|
||||
connect | api::ConnectRequest | api::ConnectResponse,
|
||||
shutdown | api::ShutdownRequest | api::Empty
|
||||
);
|
@ -19,7 +19,7 @@ containerd-shim-protos = { version = "0.2.0", features = ["async"]}
|
||||
go-flag = "0.1.0"
|
||||
libc = "0.2.108"
|
||||
log = "0.4.14"
|
||||
nix = "0.16.0"
|
||||
nix = "0.23.1"
|
||||
protobuf = "2.23.0"
|
||||
sha2 = "=0.9.3"
|
||||
slog = {version = "2.7.0", features = ["std", "release_max_level_trace", "max_level_trace"]}
|
||||
@ -34,11 +34,13 @@ kata-types = { path = "../../../libs/kata-types"}
|
||||
kata-sys-util = { path = "../../../libs/kata-sys-util"}
|
||||
logging = { path = "../../../libs/logging"}
|
||||
oci = { path = "../../../libs/oci" }
|
||||
service = { path = "../service" }
|
||||
|
||||
[build-dependencies]
|
||||
vergen = { version = "6", default-features = false, features = ["build", "git", "rustc"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.2.0"
|
||||
rand = "0.8.4"
|
||||
serial_test = "0.5.1"
|
||||
tests_utils = { path = "../../tests/utils"}
|
||||
|
@ -51,8 +51,8 @@ impl Args {
|
||||
))));
|
||||
}
|
||||
|
||||
validate::verify_cid(&self.id).context("verify cid")?;
|
||||
validate::verify_cid(&self.namespace).context("verify namespace")?;
|
||||
validate::verify_id(&self.id).context("verify container id")?;
|
||||
validate::verify_id(&self.namespace).context("verify namespace")?;
|
||||
|
||||
// Ensure `address` is a valid path.
|
||||
let path = PathBuf::from(self.address.clone())
|
||||
|
@ -7,11 +7,7 @@
|
||||
#[macro_use]
|
||||
extern crate slog;
|
||||
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger().new(slog::o!("subsystem" => "shim"))
|
||||
};
|
||||
}
|
||||
logging::logger_with_subsystem!(sl, "shim");
|
||||
|
||||
mod args;
|
||||
pub use args::Args;
|
||||
|
@ -31,7 +31,7 @@ impl ShimExecutor {
|
||||
}
|
||||
|
||||
pub(crate) fn load_oci_spec(&self, path: &Path) -> Result<oci::Spec> {
|
||||
let spec_file = path.join("config.json");
|
||||
let spec_file = path.join(oci::OCI_SPEC_CONFIG_FILE_NAME);
|
||||
oci::Spec::load(spec_file.to_str().unwrap_or_default()).context("load spec")
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
//
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use containerd_shim_protos::shim::shim::DeleteResponse;
|
||||
use containerd_shim_protos::api;
|
||||
use protobuf::Message;
|
||||
|
||||
use crate::{shim::ShimExecutor, Error};
|
||||
@ -19,8 +19,8 @@ impl ShimExecutor {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn do_cleanup(&self) -> Result<DeleteResponse> {
|
||||
let mut rsp = DeleteResponse::new();
|
||||
fn do_cleanup(&self) -> Result<api::DeleteResponse> {
|
||||
let mut rsp = api::DeleteResponse::new();
|
||||
rsp.set_exit_status(128 + libc::SIGKILL as u32);
|
||||
let mut exited_time = protobuf::well_known_types::Timestamp::new();
|
||||
let seconds = std::time::SystemTime::now()
|
||||
@ -30,42 +30,7 @@ impl ShimExecutor {
|
||||
exited_time.set_seconds(seconds);
|
||||
rsp.set_exited_at(exited_time);
|
||||
|
||||
// TODO: implement cleanup
|
||||
service::ServiceManager::cleanup(&self.args.id).context("cleanup")?;
|
||||
Ok(rsp)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serial_test::serial;
|
||||
use tests_utils::gen_id;
|
||||
|
||||
use super::*;
|
||||
use crate::Args;
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_shim_delete() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let bundle_path = dir.path();
|
||||
std::env::set_current_dir(bundle_path).unwrap();
|
||||
|
||||
let id = gen_id(16);
|
||||
let namespace = gen_id(16);
|
||||
let args = Args {
|
||||
id,
|
||||
namespace,
|
||||
address: "containerd_socket".into(),
|
||||
publish_binary: "containerd".into(),
|
||||
socket: "socket".into(),
|
||||
bundle: bundle_path.to_str().unwrap().into(),
|
||||
debug: false,
|
||||
};
|
||||
|
||||
let executor = ShimExecutor::new(args);
|
||||
|
||||
let resp = executor.do_cleanup().unwrap();
|
||||
assert_eq!(resp.exit_status, 128 + libc::SIGKILL as u32);
|
||||
assert!(resp.exited_at.as_ref().unwrap().seconds > 0);
|
||||
}
|
||||
}
|
||||
|
@ -38,8 +38,11 @@ impl ShimExecutor {
|
||||
info!(sl!(), "start to run");
|
||||
self.args.validate(false).context("validata")?;
|
||||
|
||||
let _server_fd = get_server_fd().context("get server fd")?;
|
||||
// TODO: implement run
|
||||
let server_fd = get_server_fd().context("get server fd")?;
|
||||
let mut service_manager = service::ServiceManager::new(&self.args.id, server_fd)
|
||||
.await
|
||||
.context("new runtime server")?;
|
||||
service_manager.run().await.context("run")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user