diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 5e9bc7696f..e4c57c7093 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -142,7 +142,7 @@ impl AgentService { ) -> Result<()> { let cid = req.container_id.clone(); - kata_sys_util::validate::verify_cid(&cid)?; + kata_sys_util::validate::verify_id(&cid)?; let mut oci_spec = req.OCI.clone(); let use_sandbox_pidns = req.get_sandbox_pidns(); diff --git a/src/libs/kata-sys-util/src/validate.rs b/src/libs/kata-sys-util/src/validate.rs index a58b19289a..0847398cef 100644 --- a/src/libs/kata-sys-util/src/validate.rs +++ b/src/libs/kata-sys-util/src/validate.rs @@ -10,11 +10,11 @@ pub enum Error { InvalidContainerID(String), } -// A container ID must match this regex: +// A container ID or exec ID must match this regex: // // ^[a-zA-Z0-9][a-zA-Z0-9_.-]+$ // -pub fn verify_cid(id: &str) -> Result<(), Error> { +pub fn verify_id(id: &str) -> Result<(), Error> { let mut chars = id.chars(); let valid = match chars.next() { @@ -253,7 +253,7 @@ mod tests { for (i, d) in tests.iter().enumerate() { let msg = format!("test[{}]: {:?}", i, d); - let result = verify_cid(d.id); + let result = verify_id(d.id); let msg = format!("{}, result: {:?}", msg, result); diff --git a/src/libs/kata-types/src/config/runtime.rs b/src/libs/kata-types/src/config/runtime.rs index 7c417fdc27..75b25f166d 100644 --- a/src/libs/kata-types/src/config/runtime.rs +++ b/src/libs/kata-types/src/config/runtime.rs @@ -13,6 +13,10 @@ use crate::{eother, resolve_path, validate_path}; /// Kata runtime configuration information. #[derive(Debug, Default, Deserialize, Serialize)] pub struct Runtime { + /// Runtime name: Plan to support virt-container, linux-container, wasm-container + #[serde(default)] + pub name: String, + /// If enabled, the runtime will log additional debug messages to the system log. #[serde(default, rename = "enable_debug")] pub debug: bool, @@ -238,6 +242,7 @@ vfio_mode = "guest_kernel" fn test_config() { let content = r#" [runtime] +name = "virt-container" enable_debug = true experimental = ["a", "b"] internetworking_model = "macvtap" @@ -255,6 +260,7 @@ field_should_be_ignored = true "#; let config: TomlConfig = TomlConfig::load(content).unwrap(); config.validate().unwrap(); + assert_eq!(&config.runtime.name, "virt-container"); assert!(config.runtime.debug); assert_eq!(config.runtime.experimental.len(), 2); assert_eq!(&config.runtime.experimental[0], "a"); diff --git a/src/libs/logging/src/lib.rs b/src/libs/logging/src/lib.rs index 33f9fee3ee..2c90b5bd0f 100644 --- a/src/libs/logging/src/lib.rs +++ b/src/libs/logging/src/lib.rs @@ -17,6 +17,17 @@ mod log_writer; pub use file_rotate::FileRotator; pub use log_writer::LogWriter; +#[macro_export] +macro_rules! logger_with_subsystem { + ($name: ident, $subsystem: expr) => { + macro_rules! $name { + () => { + slog_scope::logger().new(slog::o!("subsystem" => $subsystem)) + }; + } + }; +} + const LOG_LEVELS: &[(&str, slog::Level)] = &[ ("trace", slog::Level::Trace), ("debug", slog::Level::Debug), diff --git a/src/libs/oci/src/lib.rs b/src/libs/oci/src/lib.rs index f47f2df4be..ace2238ef3 100644 --- a/src/libs/oci/src/lib.rs +++ b/src/libs/oci/src/lib.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; mod serialize; pub use serialize::{to_string, to_writer, Error, Result}; +pub const OCI_SPEC_CONFIG_FILE_NAME: &str = "config.json"; + #[allow(dead_code)] fn is_false(b: bool) -> bool { !b diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index 7f3db194da..4dda25cd7c 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -157,7 +157,7 @@ checksum = "cdae996d9638ba03253ffa1c93345a585974a97abbdeab9176c77922f3efc1e8" dependencies = [ "libc", "log", - "nix 0.23.1", + "nix", "regex", ] @@ -174,6 +174,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "common" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "containerd-shim-protos", + "kata-sys-util", + "kata-types", + "lazy_static", + "nix", + "oci", + "protobuf", + "serde_json", + "slog", + "slog-scope", + "strum", + "thiserror", + "tokio", + "ttrpc", +] + [[package]] name = "common-path" version = "1.0.0" @@ -476,6 +498,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -559,7 +587,7 @@ dependencies = [ "kata-types", "lazy_static", "libc", - "nix 0.23.1", + "nix", "oci", "once_cell", "serde_json", @@ -622,6 +650,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux_container" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "common", + "tokio", +] + [[package]] name = "lock_api" version = "0.4.6" @@ -711,19 +749,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "nix" -version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0eaf8df8bab402257e0a5c17a254e4cc1f72a93588a1ddfb5d356c801aa7cb" -dependencies = [ - "bitflags", - "cc", - "cfg-if 0.1.10", - "libc", - "void", -] - [[package]] name = "nix" version = "0.23.1" @@ -930,7 +955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603" dependencies = [ "bytes 1.1.0", - "heck", + "heck 0.3.3", "itertools", "log", "multimap", @@ -1078,6 +1103,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "runtimes" +version = "0.1.0" +dependencies = [ + "anyhow", + "common", + "kata-types", + "lazy_static", + "linux_container", + "logging", + "oci", + "slog", + "slog-scope", + "tokio", + "virt_container", + "wasm_container", +] + [[package]] name = "rustc-demangle" version = "0.1.21" @@ -1170,6 +1213,22 @@ dependencies = [ "syn", ] +[[package]] +name = "service" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "common", + "containerd-shim-protos", + "logging", + "runtimes", + "slog", + "slog-scope", + "tokio", + "ttrpc", +] + [[package]] name = "sha2" version = "0.9.3" @@ -1196,10 +1255,12 @@ dependencies = [ "libc", "log", "logging", - "nix 0.16.1", + "nix", "oci", "protobuf", + "rand", "serial_test", + "service", "sha2", "slog", "slog-async", @@ -1213,6 +1274,15 @@ dependencies = [ "vergen", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.5" @@ -1287,6 +1357,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "strum" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e96acfc1b70604b8b2f1ffa4c57e59176c7dbb05d556c71ecd2f5498a1dee7f8" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef" +dependencies = [ + "heck 0.4.0", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subprocess" version = "0.2.8" @@ -1411,7 +1503,9 @@ dependencies = [ "memchr", "mio", "num_cpus", + "once_cell", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "winapi", @@ -1461,7 +1555,7 @@ dependencies = [ "futures 0.3.21", "libc", "log", - "nix 0.23.1", + "nix", "protobuf", "protobuf-codegen-pure", "thiserror", @@ -1580,10 +1674,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] -name = "void" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +name = "virt_container" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "common", + "kata-types", + "tokio", +] [[package]] name = "vsock" @@ -1592,7 +1691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e32675ee2b3ce5df274c0ab52d19b28789632406277ca26bffee79a8e27dc133" dependencies = [ "libc", - "nix 0.23.1", + "nix", ] [[package]] @@ -1607,6 +1706,16 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm_container" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "common", + "tokio", +] + [[package]] name = "which" version = "4.2.5" diff --git a/src/runtime-rs/crates/agent/src/sock/mod.rs b/src/runtime-rs/crates/agent/src/sock/mod.rs index 52ea993b50..3ec4da6aa9 100644 --- a/src/runtime-rs/crates/agent/src/sock/mod.rs +++ b/src/runtime-rs/crates/agent/src/sock/mod.rs @@ -120,12 +120,12 @@ fn parse(address: &str, port: u32) -> Result { let url = Url::parse(address).context("parse url")?; match url.scheme() { VSOCK_SCHEME => { - let cid = url + let vsock_cid = url .host_str() .unwrap_or_default() .parse::() - .context("parse cid")?; - Ok(SockType::Vsock(Vsock::new(cid, port))) + .context("parse vsock cid")?; + Ok(SockType::Vsock(Vsock::new(vsock_cid, port))) } HYBRID_VSOCK_SCHEME => { let path: Vec<&str> = url.path().split(':').collect(); diff --git a/src/runtime-rs/crates/agent/src/sock/vsock.rs b/src/runtime-rs/crates/agent/src/sock/vsock.rs index 7f6a59d89a..9b62bb9766 100644 --- a/src/runtime-rs/crates/agent/src/sock/vsock.rs +++ b/src/runtime-rs/crates/agent/src/sock/vsock.rs @@ -14,13 +14,13 @@ unsafe impl Sync for Vsock {} #[derive(Debug, PartialEq)] pub struct Vsock { - cid: u32, + vsock_cid: u32, port: u32, } impl Vsock { - pub fn new(cid: u32, port: u32) -> Self { - Self { cid, port } + pub fn new(vsock_cid: u32, port: u32) -> Self { + Self { vsock_cid, port } } } diff --git a/src/runtime-rs/crates/runtimes/Cargo.toml b/src/runtime-rs/crates/runtimes/Cargo.toml new file mode 100644 index 0000000000..304a7639bc --- /dev/null +++ b/src/runtime-rs/crates/runtimes/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "runtimes" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" + +[dependencies] +anyhow = "^1.0" +lazy_static = "1.4.0" +slog = "2.5.2" +slog-scope = "4.4.0" +tokio = { version = "1.8.0", features = ["rt-multi-thread"] } + +common = { path = "./common" } +kata-types = { path = "../../../libs/kata-types" } +logging = { path = "../../../libs/logging"} +oci = { path = "../../../libs/oci" } + +# runtime handler +linux_container = { path = "./linux_container", optional = true } +virt_container = { path = "./virt_container", optional = true } +wasm_container = { path = "./wasm_container", optional = true } + +[features] +default = ["virt"] +linux = ["linux_container"] +virt = ["virt_container"] +wasm = ["wasm_container"] diff --git a/src/runtime-rs/crates/runtimes/common/Cargo.toml b/src/runtime-rs/crates/runtimes/common/Cargo.toml new file mode 100644 index 0000000000..f2bff39c84 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "common" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "^1.0" +async-trait = "0.1.48" +containerd-shim-protos = { version = "0.2.0", features = ["async"]} +lazy_static = "1.4.0" +nix = "0.23.1" +protobuf = "2.23.0" +serde_json = "1.0.39" +slog = "2.5.2" +slog-scope = "4.4.0" +strum = { version = "0.24.0", features = ["derive"] } +thiserror = "^1.0" +tokio = { version = "1.8.0", features = ["rt-multi-thread", "process", "fs"] } +ttrpc = { version = "0.6.0" } + +kata-sys-util = { path = "../../../../libs/kata-sys-util" } +kata-types = { path = "../../../../libs/kata-types" } +oci = { path = "../../../../libs/oci" } diff --git a/src/runtime-rs/crates/runtimes/common/src/container_manager.rs b/src/runtime-rs/crates/runtimes/common/src/container_manager.rs new file mode 100644 index 0000000000..aeba770a66 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/container_manager.rs @@ -0,0 +1,40 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use anyhow::Result; +use async_trait::async_trait; + +use crate::types::{ + ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, + ProcessExitStatus, ProcessStateInfo, ResizePTYRequest, ShutdownRequest, StatsInfo, + UpdateRequest, PID, +}; + +#[async_trait] +pub trait ContainerManager: Send + Sync { + // container lifecycle + async fn create_container(&self, config: ContainerConfig) -> Result; + async fn pause_container(&self, container_id: &ContainerID) -> Result<()>; + async fn resume_container(&self, container_id: &ContainerID) -> Result<()>; + async fn stats_container(&self, container_id: &ContainerID) -> Result; + async fn update_container(&self, req: UpdateRequest) -> Result<()>; + async fn connect_container(&self, container_id: &ContainerID) -> Result; + + // process lifecycle + async fn close_process_io(&self, process_id: &ContainerProcess) -> Result<()>; + async fn delete_process(&self, process_id: &ContainerProcess) -> Result; + async fn exec_process(&self, req: ExecProcessRequest) -> Result<()>; + async fn kill_process(&self, req: &KillRequest) -> Result<()>; + async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()>; + async fn start_process(&self, process_id: &ContainerProcess) -> Result; + async fn state_process(&self, process_id: &ContainerProcess) -> Result; + async fn wait_process(&self, process_id: &ContainerProcess) -> Result; + + // utility + async fn pid(&self) -> Result; + async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool; + async fn is_sandbox_container(&self, process_id: &ContainerProcess) -> bool; +} diff --git a/src/runtime-rs/crates/runtimes/common/src/error.rs b/src/runtime-rs/crates/runtimes/common/src/error.rs new file mode 100644 index 0000000000..2ec03c4c6c --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/error.rs @@ -0,0 +1,17 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use crate::types::{ContainerProcess, Response}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("failed to find container {0}")] + ContainerNotFound(String), + #[error("failed to find process {0}")] + ProcessNotFound(ContainerProcess), + #[error("unexpected response {0} to shim {1}")] + UnexpectedResponse(Response, String), +} diff --git a/src/runtime-rs/crates/runtimes/common/src/lib.rs b/src/runtime-rs/crates/runtimes/common/src/lib.rs new file mode 100644 index 0000000000..36977964ad --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/lib.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +mod container_manager; +pub use container_manager::ContainerManager; +pub mod error; +pub mod message; +mod runtime_handler; +pub use runtime_handler::{RuntimeHandler, RuntimeInstance}; +mod sandbox; +pub use sandbox::Sandbox; +pub mod types; diff --git a/src/runtime-rs/crates/runtimes/common/src/message.rs b/src/runtime-rs/crates/runtimes/common/src/message.rs new file mode 100644 index 0000000000..ff6ee960d7 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/message.rs @@ -0,0 +1,44 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use anyhow::Result; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +/// message receiver buffer size +const MESSAGE_RECEIVER_BUFFER_SIZE: usize = 1; + +#[derive(Debug)] +pub enum Action { + Start, + Stop, + Shutdown, +} + +#[derive(Debug)] +pub struct Message { + pub action: Action, + pub resp_sender: Option>>, +} + +impl Message { + pub fn new(action: Action) -> Self { + Message { + action, + resp_sender: None, + } + } + + pub fn new_with_receiver(action: Action) -> (Receiver>, Self) { + let (resp_sender, receiver) = channel(MESSAGE_RECEIVER_BUFFER_SIZE); + ( + receiver, + Message { + action, + resp_sender: Some(resp_sender), + }, + ) + } +} diff --git a/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs b/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs new file mode 100644 index 0000000000..d74b83b1d0 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs @@ -0,0 +1,38 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::mpsc::Sender; + +use crate::{message::Message, ContainerManager, Sandbox}; + +#[derive(Clone)] +pub struct RuntimeInstance { + pub sandbox: Arc, + pub container_manager: Arc, +} + +#[async_trait] +pub trait RuntimeHandler: Send + Sync { + fn init() -> Result<()> + where + Self: Sized; + + fn name() -> String + where + Self: Sized; + + fn new_handler() -> Arc + where + Self: Sized; + + async fn new_instance(&self, sid: &str, msg_sender: Sender) + -> Result; + + fn cleanup(&self, id: &str) -> Result<()>; +} diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs new file mode 100644 index 0000000000..699fc1977a --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use anyhow::Result; +use async_trait::async_trait; + +#[async_trait] +pub trait Sandbox: Send + Sync { + async fn start(&self) -> Result<()>; + async fn stop(&self) -> Result<()>; + async fn cleanup(&self, container_id: &str) -> Result<()>; + async fn shutdown(&self) -> Result<()>; +} diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs new file mode 100644 index 0000000000..e398735ff7 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -0,0 +1,219 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +mod trans_from_shim; +mod trans_into_shim; + +use std::fmt; + +use anyhow::{Context, Result}; +use kata_sys_util::validate; +use kata_types::mount::Mount; +use strum::Display; + +/// Request: request from shim +/// Request and Response messages need to be paired +#[derive(Debug, Clone, Display)] +pub enum Request { + CreateContainer(ContainerConfig), + CloseProcessIO(ContainerProcess), + DeleteProcess(ContainerProcess), + ExecProcess(ExecProcessRequest), + KillProcess(KillRequest), + WaitProcess(ContainerProcess), + StartProcess(ContainerProcess), + StateProcess(ContainerProcess), + ShutdownContainer(ShutdownRequest), + PauseContainer(ContainerID), + ResumeContainer(ContainerID), + ResizeProcessPTY(ResizePTYRequest), + StatsContainer(ContainerID), + UpdateContainer(UpdateRequest), + Pid, + ConnectContainer(ContainerID), +} + +/// Response: response to shim +/// Request and Response messages need to be paired +#[derive(Debug, Clone, Display)] +pub enum Response { + CreateContainer(PID), + CloseProcessIO, + DeleteProcess(ProcessStateInfo), + ExecProcess, + KillProcess, + WaitProcess(ProcessExitStatus), + StartProcess(PID), + StateProcess(ProcessStateInfo), + ShutdownContainer, + PauseContainer, + ResumeContainer, + ResizeProcessPTY, + StatsContainer(StatsInfo), + UpdateContainer, + Pid(PID), + ConnectContainer(PID), +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ProcessType { + Container, + Exec, +} + +#[derive(Clone, Debug)] +pub struct ContainerID { + pub container_id: String, +} + +impl ContainerID { + pub fn new(container_id: &str) -> Result { + validate::verify_id(container_id).context("verify container id")?; + Ok(Self { + container_id: container_id.to_string(), + }) + } +} + +#[derive(Clone, Debug)] +pub struct ContainerProcess { + pub container_id: ContainerID, + pub exec_id: String, + pub process_type: ProcessType, +} + +impl fmt::Display for ContainerProcess { + fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", &self) + } +} + +impl ContainerProcess { + pub fn new(container_id: &str, exec_id: &str) -> Result { + let (exec_id, process_type) = if exec_id.is_empty() || container_id == exec_id { + ("".to_string(), ProcessType::Container) + } else { + validate::verify_id(exec_id).context("verify exec id")?; + (exec_id.to_string(), ProcessType::Exec) + }; + Ok(Self { + container_id: ContainerID::new(container_id)?, + exec_id, + process_type, + }) + } +} +#[derive(Debug, Clone)] +pub struct ContainerConfig { + pub container_id: String, + pub bundle: String, + pub rootfs_mounts: Vec, + pub terminal: bool, + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, +} + +#[derive(Debug, Clone)] +pub struct PID { + pub pid: u32, +} + +impl PID { + pub fn new(pid: u32) -> Self { + Self { pid } + } +} + +#[derive(Debug, Clone)] +pub struct KillRequest { + pub process_id: ContainerProcess, + pub signal: u32, + pub all: bool, +} + +#[derive(Debug, Clone)] +pub struct ShutdownRequest { + pub container_id: String, + pub is_now: bool, +} + +#[derive(Debug, Clone)] +pub struct ResizePTYRequest { + pub process_id: ContainerProcess, + pub width: u32, + pub height: u32, +} + +#[derive(Debug, Clone)] +pub struct ExecProcessRequest { + pub process_id: ContainerProcess, + pub terminal: bool, + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + pub spec_type_url: String, + pub spec_value: Vec, +} + +#[derive(Clone, Copy, PartialEq, Debug)] +pub enum ProcessStatus { + Unknown = 0, + Created = 1, + Running = 2, + Stopped = 3, + Paused = 4, + Pausing = 5, +} + +#[derive(Debug, Clone)] +pub struct ProcessStateInfo { + pub container_id: String, + pub exec_id: String, + pub pid: PID, + pub bundle: String, + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + pub terminal: bool, + pub status: ProcessStatus, + pub exit_status: i32, + pub exited_at: Option, +} + +#[derive(Debug, Clone, Default)] +pub struct ProcessExitStatus { + pub exit_code: i32, + pub exit_time: Option, +} + +impl ProcessExitStatus { + pub fn new() -> Self { + Self::default() + } + + pub fn update_exit_code(&mut self, exit_code: i32) { + self.exit_code = exit_code; + self.exit_time = Some(std::time::SystemTime::now()); + } +} + +#[derive(Debug, Clone)] +pub struct StatsInfoValue { + pub type_url: String, + pub value: Vec, +} + +#[derive(Debug, Clone)] +pub struct StatsInfo { + pub value: Option, +} + +#[derive(Debug, Clone)] +pub struct UpdateRequest { + pub container_id: String, + pub value: Vec, +} diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs new file mode 100644 index 0000000000..c26bb68281 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs @@ -0,0 +1,198 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + convert::{From, TryFrom}, + path::PathBuf, +}; + +use anyhow::{Context, Result}; +use containerd_shim_protos::api; +use kata_types::mount::Mount; + +use super::{ + ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request, + ResizePTYRequest, ShutdownRequest, UpdateRequest, +}; + +fn trans_from_shim_mount(from: api::Mount) -> Mount { + let options = from.options.to_vec(); + let mut read_only = false; + for o in &options { + if o == "ro" { + read_only = true; + break; + } + } + + Mount { + source: from.source.clone(), + destination: PathBuf::from(&from.target), + fs_type: from.field_type, + options, + device_id: None, + host_shared_fs_path: None, + read_only, + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::CreateTaskRequest) -> Result { + Ok(Request::CreateContainer(ContainerConfig { + container_id: from.id.clone(), + bundle: from.bundle.clone(), + rootfs_mounts: from + .rootfs + .to_vec() + .into_iter() + .map(trans_from_shim_mount) + .collect(), + terminal: from.terminal, + stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()), + stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()), + stderr: (!from.stderr.is_empty()).then(|| from.stderr.clone()), + })) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::CloseIORequest) -> Result { + Ok(Request::CloseProcessIO( + ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + )) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::DeleteRequest) -> Result { + Ok(Request::DeleteProcess( + ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + )) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::ExecProcessRequest) -> Result { + let spec = from.get_spec(); + Ok(Request::ExecProcess(ExecProcessRequest { + process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + terminal: from.terminal, + stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()), + stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()), + stderr: (!from.stderr.is_empty()).then(|| from.stderr.clone()), + spec_type_url: spec.get_type_url().to_string(), + spec_value: spec.get_value().to_vec(), + })) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::KillRequest) -> Result { + Ok(Request::KillProcess(KillRequest { + process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + signal: from.signal, + all: from.all, + })) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::WaitRequest) -> Result { + Ok(Request::WaitProcess( + ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + )) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::StartRequest) -> Result { + Ok(Request::StartProcess( + ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + )) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::StateRequest) -> Result { + Ok(Request::StateProcess( + ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + )) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::ShutdownRequest) -> Result { + Ok(Request::ShutdownContainer(ShutdownRequest { + container_id: from.id.to_string(), + is_now: from.now, + })) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::ResizePtyRequest) -> Result { + Ok(Request::ResizeProcessPTY(ResizePTYRequest { + process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + width: from.width, + height: from.height, + })) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::PauseRequest) -> Result { + Ok(Request::PauseContainer(ContainerID::new(&from.id)?)) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::ResumeRequest) -> Result { + Ok(Request::ResumeContainer(ContainerID::new(&from.id)?)) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::StatsRequest) -> Result { + Ok(Request::StatsContainer(ContainerID::new(&from.id)?)) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::UpdateTaskRequest) -> Result { + Ok(Request::UpdateContainer(UpdateRequest { + container_id: from.id.to_string(), + value: from.get_resources().get_value().to_vec(), + })) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(_from: api::PidsRequest) -> Result { + Ok(Request::Pid) + } +} + +impl TryFrom for Request { + type Error = anyhow::Error; + fn try_from(from: api::ConnectRequest) -> Result { + Ok(Request::ConnectContainer(ContainerID::new(&from.id)?)) + } +} diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs new file mode 100644 index 0000000000..3c3134e8fd --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -0,0 +1,242 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + any::type_name, + convert::{Into, TryFrom, TryInto}, + time, +}; + +use anyhow::{anyhow, Result}; +use containerd_shim_protos::api; + +use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response}; +use crate::error::Error; + +fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::Timestamp { + let mut proto_time = ::protobuf::well_known_types::Timestamp::new(); + proto_time.set_seconds( + time.duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .try_into() + .unwrap_or_default(), + ); + proto_time +} + +fn option_system_time_into( + time: Option, +) -> ::protobuf::SingularPtrField<::protobuf::well_known_types::Timestamp> { + match time { + Some(v) => ::protobuf::SingularPtrField::some(system_time_into(v)), + None => ::protobuf::SingularPtrField::none(), + } +} + +impl From for api::WaitResponse { + fn from(from: ProcessExitStatus) -> Self { + Self { + exit_status: from.exit_code as u32, + exited_at: option_system_time_into(from.exit_time), + ..Default::default() + } + } +} + +impl From for api::Status { + fn from(from: ProcessStatus) -> Self { + match from { + ProcessStatus::Unknown => api::Status::UNKNOWN, + ProcessStatus::Created => api::Status::CREATED, + ProcessStatus::Running => api::Status::RUNNING, + ProcessStatus::Stopped => api::Status::STOPPED, + ProcessStatus::Paused => api::Status::PAUSED, + ProcessStatus::Pausing => api::Status::PAUSING, + } + } +} +impl From for api::StateResponse { + fn from(from: ProcessStateInfo) -> Self { + Self { + id: from.container_id.clone(), + bundle: from.bundle.clone(), + pid: from.pid.pid, + status: from.status.into(), + stdin: from.stdin.unwrap_or_default(), + stdout: from.stdout.unwrap_or_default(), + stderr: from.stderr.unwrap_or_default(), + terminal: from.terminal, + exit_status: from.exit_status as u32, + exited_at: option_system_time_into(from.exited_at), + exec_id: from.exec_id, + ..Default::default() + } + } +} + +impl From for api::DeleteResponse { + fn from(from: ProcessStateInfo) -> Self { + Self { + pid: from.pid.pid, + exit_status: from.exit_status as u32, + exited_at: option_system_time_into(from.exited_at), + ..Default::default() + } + } +} + +impl TryFrom for api::CreateTaskResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::CreateContainer(resp) => Ok(Self { + pid: resp.pid, + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::DeleteResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::DeleteProcess(resp) => Ok(resp.into()), + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::WaitResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::WaitProcess(resp) => Ok(resp.into()), + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::StartResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::StartProcess(resp) => Ok(api::StartResponse { + pid: resp.pid, + ..Default::default() + }), + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::StateResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::StateProcess(resp) => Ok(resp.into()), + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::StatsResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + let mut any = ::protobuf::well_known_types::Any::new(); + let mut response = api::StatsResponse::new(); + match from { + Response::StatsContainer(resp) => { + if let Some(value) = resp.value { + any.set_type_url(value.type_url); + any.set_value(value.value); + response.set_stats(any); + } + Ok(response) + } + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::PidsResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::Pid(resp) => { + let mut processes: Vec = vec![]; + let mut p_info = api::ProcessInfo::new(); + let mut res = api::PidsResponse::new(); + p_info.set_pid(resp.pid); + processes.push(p_info); + let v = protobuf::RepeatedField::::from_vec(processes); + res.set_processes(v); + Ok(res) + } + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::ConnectResponse { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::ConnectContainer(resp) => { + let mut res = api::ConnectResponse::new(); + res.set_shim_pid(resp.pid); + Ok(res) + } + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} + +impl TryFrom for api::Empty { + type Error = anyhow::Error; + fn try_from(from: Response) -> Result { + match from { + Response::CloseProcessIO => Ok(api::Empty::new()), + Response::ExecProcess => Ok(api::Empty::new()), + Response::KillProcess => Ok(api::Empty::new()), + Response::ShutdownContainer => Ok(api::Empty::new()), + Response::PauseContainer => Ok(api::Empty::new()), + Response::ResumeContainer => Ok(api::Empty::new()), + Response::ResizeProcessPTY => Ok(api::Empty::new()), + Response::UpdateContainer => Ok(api::Empty::new()), + _ => Err(anyhow!(Error::UnexpectedResponse( + from, + type_name::().to_string() + ))), + } + } +} diff --git a/src/runtime-rs/crates/runtimes/linux_container/Cargo.toml b/src/runtime-rs/crates/runtimes/linux_container/Cargo.toml new file mode 100644 index 0000000000..81d4e3e03a --- /dev/null +++ b/src/runtime-rs/crates/runtimes/linux_container/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "linux_container" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" + +[dependencies] +anyhow = "^1.0" +async-trait = "0.1.48" +tokio = { version = "1.8.0" } + +common = { path = "../common" } diff --git a/src/runtime-rs/crates/runtimes/linux_container/src/lib.rs b/src/runtime-rs/crates/runtimes/linux_container/src/lib.rs new file mode 100644 index 0000000000..d50de90b17 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/linux_container/src/lib.rs @@ -0,0 +1,42 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use common::{message::Message, RuntimeHandler, RuntimeInstance}; +use tokio::sync::mpsc::Sender; + +unsafe impl Send for LinuxContainer {} +unsafe impl Sync for LinuxContainer {} +pub struct LinuxContainer {} + +#[async_trait] +impl RuntimeHandler for LinuxContainer { + fn init() -> Result<()> { + Ok(()) + } + + fn name() -> String { + "linux_container".to_string() + } + + fn new_handler() -> Arc { + Arc::new(LinuxContainer {}) + } + + async fn new_instance( + &self, + _sid: &str, + _msg_sender: Sender, + ) -> Result { + todo!() + } + + fn cleanup(&self, _id: &str) -> Result<()> { + todo!() + } +} diff --git a/src/runtime-rs/crates/runtimes/src/lib.rs b/src/runtime-rs/crates/runtimes/src/lib.rs new file mode 100644 index 0000000000..64c57feeae --- /dev/null +++ b/src/runtime-rs/crates/runtimes/src/lib.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +#[macro_use] +extern crate slog; + +logging::logger_with_subsystem!(sl, "runtimes"); + +mod manager; +pub use manager::RuntimeHandlerManager; diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs new file mode 100644 index 0000000000..f3006f8561 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -0,0 +1,251 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::sync::Arc; + +use anyhow::{anyhow, Context, Result}; +use common::{ + message::Message, + types::{Request, Response}, + RuntimeHandler, RuntimeInstance, +}; +use kata_types::{annotations::Annotation, config::TomlConfig}; +use tokio::sync::{mpsc::Sender, RwLock}; + +#[cfg(feature = "linux")] +use linux_container::LinuxContainer; +#[cfg(feature = "virt")] +use virt_container::VirtContainer; +#[cfg(feature = "wasm")] +use wasm_container::WasmContainer; + +struct RuntimeHandlerManagerInner { + id: String, + msg_sender: Sender, + runtime_instance: Option, +} + +impl RuntimeHandlerManagerInner { + fn new(id: &str, msg_sender: Sender) -> Result { + Ok(Self { + id: id.to_string(), + msg_sender, + runtime_instance: None, + }) + } + + async fn init_runtime_handler(&mut self, runtime_name: &str) -> Result<()> { + info!(sl!(), "new runtime handler {}", runtime_name); + + let runtime_handler = match runtime_name { + #[cfg(feature = "linux")] + name if name == LinuxContainer::name() => { + LinuxContainer::init().context("init linux container")?; + LinuxContainer::new_handler() + } + #[cfg(feature = "wasm")] + name if name == WasmContainer::name() => { + WasmContainer::init().context("init wasm container")?; + WasmContainer::new_handler() + } + #[cfg(feature = "virt")] + name if name == VirtContainer::name() => { + VirtContainer::init().context("init virt container")?; + VirtContainer::new_handler() + } + _ => return Err(anyhow!("Unsupported runtime: {}", runtime_name)), + }; + let runtime_instance = runtime_handler + .new_instance(&self.id, self.msg_sender.clone()) + .await + .context("new runtime instance")?; + + // start sandbox + runtime_instance + .sandbox + .start() + .await + .context("start sandbox")?; + self.runtime_instance = Some(runtime_instance); + Ok(()) + } + + async fn try_init(&mut self, spec: &oci::Spec) -> Result<()> { + // return if runtime instance has init + if self.runtime_instance.is_some() { + return Ok(()); + } + + let config = load_config(spec).context("load config")?; + self.init_runtime_handler(&config.runtime.name) + .await + .context("init runtime handler")?; + + Ok(()) + } + + fn get_runtime_instance(&self) -> Option { + self.runtime_instance.clone() + } +} + +unsafe impl Send for RuntimeHandlerManager {} +unsafe impl Sync for RuntimeHandlerManager {} +pub struct RuntimeHandlerManager { + inner: Arc>, +} + +impl RuntimeHandlerManager { + pub async fn new(id: &str, msg_sender: Sender) -> Result { + Ok(Self { + inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new( + id, msg_sender, + )?)), + }) + } + + pub fn cleanup(_id: &str) -> Result<()> { + // TODO: load runtime from persist and cleanup + Ok(()) + } + + pub async fn handler_message(&self, req: Request) -> Result { + if let Request::CreateContainer(req) = req { + // get oci spec + let bundler_path = format!("{}/{}", req.bundle, oci::OCI_SPEC_CONFIG_FILE_NAME); + let spec = oci::Spec::load(&bundler_path).context("load spec")?; + + let mut inner = self.inner.write().await; + inner + .try_init(&spec) + .await + .context("try init runtime handler")?; + + let instance = inner + .get_runtime_instance() + .ok_or_else(|| anyhow!("runtime not ready"))?; + + let shim_pid = instance + .container_manager + .create_container(req) + .await + .context("create container")?; + Ok(Response::CreateContainer(shim_pid)) + } else { + self.handler_request(req).await.context("handler request") + } + } + + pub async fn handler_request(&self, req: Request) -> Result { + let inner = self.inner.read().await; + let instance = inner + .get_runtime_instance() + .ok_or_else(|| anyhow!("runtime not ready"))?; + let sandbox = instance.sandbox; + let cm = instance.container_manager; + + match req { + Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)), + Request::CloseProcessIO(process_id) => { + cm.close_process_io(&process_id).await.context("close io")?; + Ok(Response::CloseProcessIO) + } + Request::DeleteProcess(process_id) => { + let resp = cm.delete_process(&process_id).await.context("do delete")?; + Ok(Response::DeleteProcess(resp)) + } + Request::ExecProcess(req) => { + cm.exec_process(req).await.context("exec")?; + Ok(Response::ExecProcess) + } + Request::KillProcess(req) => { + cm.kill_process(&req).await.context("kill process")?; + Ok(Response::KillProcess) + } + Request::ShutdownContainer(req) => { + if cm.need_shutdown_sandbox(&req).await { + sandbox.shutdown().await.context("do shutdown")?; + } + Ok(Response::ShutdownContainer) + } + Request::WaitProcess(process_id) => { + let exit_status = cm.wait_process(&process_id).await.context("wait process")?; + if cm.is_sandbox_container(&process_id).await { + sandbox.stop().await.context("stop sandbox")?; + } + Ok(Response::WaitProcess(exit_status)) + } + Request::StartProcess(process_id) => { + let shim_pid = cm + .start_process(&process_id) + .await + .context("start process")?; + Ok(Response::StartProcess(shim_pid)) + } + + Request::StateProcess(process_id) => { + let state = cm + .state_process(&process_id) + .await + .context("state process")?; + Ok(Response::StateProcess(state)) + } + Request::PauseContainer(container_id) => { + cm.pause_container(&container_id) + .await + .context("pause container")?; + Ok(Response::PauseContainer) + } + Request::ResumeContainer(container_id) => { + cm.resume_container(&container_id) + .await + .context("resume container")?; + Ok(Response::ResumeContainer) + } + Request::ResizeProcessPTY(req) => { + cm.resize_process_pty(&req).await.context("resize pty")?; + Ok(Response::ResizeProcessPTY) + } + Request::StatsContainer(container_id) => { + let stats = cm + .stats_container(&container_id) + .await + .context("stats container")?; + Ok(Response::StatsContainer(stats)) + } + Request::UpdateContainer(req) => { + cm.update_container(req).await.context("update container")?; + Ok(Response::UpdateContainer) + } + Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)), + Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer( + cm.connect_container(&container_id) + .await + .context("connect")?, + )), + } + } +} + +/// Config override ordering(high to low): +/// 1. podsandbox annotation +/// 2. shimv2 create task option +/// TODO: https://github.com/kata-containers/kata-containers/issues/3961 +/// 3. environment +fn load_config(spec: &oci::Spec) -> Result { + const KATA_CONF_FILE: &str = "KATA_CONF_FILE"; + let annotation = Annotation::new(spec.annotations.clone()); + let config_path = if let Some(path) = annotation.get_sandbox_config_path() { + path + } else if let Ok(path) = std::env::var(KATA_CONF_FILE) { + path + } else { + String::from("") + }; + info!(sl!(), "get config path {:?}", &config_path); + let (toml_config, _) = TomlConfig::load_from_file(&config_path).context("load toml config")?; + Ok(toml_config) +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml new file mode 100644 index 0000000000..8e63c01c1a --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "virt_container" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" + +[dependencies] +anyhow = "^1.0" +async-trait = "0.1.48" +tokio = { version = "1.8.0" } + +common = { path = "../common" } +kata-types = { path = "../../../../libs/kata-types" } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs new file mode 100644 index 0000000000..8419a93828 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs @@ -0,0 +1,47 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use common::{message::Message, RuntimeHandler, RuntimeInstance}; +use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig}; +use tokio::sync::mpsc::Sender; + +unsafe impl Send for VirtContainer {} +unsafe impl Sync for VirtContainer {} +pub struct VirtContainer {} + +#[async_trait] +impl RuntimeHandler for VirtContainer { + fn init() -> Result<()> { + // register + let dragonball_config = Arc::new(DragonballConfig::new()); + register_hypervisor_plugin("dragonball", dragonball_config); + Ok(()) + } + + fn name() -> String { + "virt_container".to_string() + } + + fn new_handler() -> Arc { + Arc::new(VirtContainer {}) + } + + async fn new_instance( + &self, + _sid: &str, + _msg_sender: Sender, + ) -> Result { + todo!() + } + + fn cleanup(&self, _id: &str) -> Result<()> { + // TODO + Ok(()) + } +} diff --git a/src/runtime-rs/crates/runtimes/wasm_container/Cargo.toml b/src/runtime-rs/crates/runtimes/wasm_container/Cargo.toml new file mode 100644 index 0000000000..9dfce237e2 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/wasm_container/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "wasm_container" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" + +[dependencies] +anyhow = "^1.0" +async-trait = "0.1.48" +tokio = { version = "1.8.0" } + +common = { path = "../common" } diff --git a/src/runtime-rs/crates/runtimes/wasm_container/src/lib.rs b/src/runtime-rs/crates/runtimes/wasm_container/src/lib.rs new file mode 100644 index 0000000000..c92cd965a0 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/wasm_container/src/lib.rs @@ -0,0 +1,42 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use common::{message::Message, RuntimeHandler, RuntimeInstance}; +use tokio::sync::mpsc::Sender; + +unsafe impl Send for WasmContainer {} +unsafe impl Sync for WasmContainer {} +pub struct WasmContainer {} + +#[async_trait] +impl RuntimeHandler for WasmContainer { + fn init() -> Result<()> { + Ok(()) + } + + fn name() -> String { + "wasm_container".to_string() + } + + fn new_handler() -> Arc { + Arc::new(WasmContainer {}) + } + + async fn new_instance( + &self, + _sid: &str, + _msg_sender: Sender, + ) -> Result { + todo!() + } + + fn cleanup(&self, _id: &str) -> Result<()> { + todo!() + } +} diff --git a/src/runtime-rs/crates/service/Cargo.toml b/src/runtime-rs/crates/service/Cargo.toml new file mode 100644 index 0000000000..b3aa85a64f --- /dev/null +++ b/src/runtime-rs/crates/service/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "service" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" + +[dependencies] +anyhow = "^1.0" +async-trait = "0.1.48" +slog = "2.5.2" +slog-scope = "4.4.0" +tokio = { version = "1.8.0", features = ["rt-multi-thread"] } +ttrpc = { version = "0.6.0" } + +common = { path = "../runtimes/common" } +containerd-shim-protos = { version = "0.2.0", features = ["async"]} +logging = { path = "../../../libs/logging"} +runtimes = { path = "../runtimes" } diff --git a/src/runtime-rs/crates/service/src/lib.rs b/src/runtime-rs/crates/service/src/lib.rs new file mode 100644 index 0000000000..1f28a8009c --- /dev/null +++ b/src/runtime-rs/crates/service/src/lib.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +#[macro_use] +extern crate slog; + +logging::logger_with_subsystem!(sl, "service"); + +mod manager; +pub use manager::ServiceManager; +mod task_service; diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs new file mode 100644 index 0000000000..dac1b2275f --- /dev/null +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -0,0 +1,107 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + os::unix::io::{FromRawFd, RawFd}, + sync::Arc, +}; + +use anyhow::{Context, Result}; +use common::message::{Action, Message}; +use containerd_shim_protos::shim_async; +use runtimes::RuntimeHandlerManager; +use tokio::sync::mpsc::{channel, Receiver}; +use ttrpc::asynchronous::Server; + +use crate::task_service::TaskService; + +/// message buffer size +const MESSAGE_BUFFER_SIZE: usize = 8; + +pub struct ServiceManager { + receiver: Option>, + handler: Arc, + task_server: Option, +} + +impl ServiceManager { + pub async fn new(id: &str, task_server_fd: RawFd) -> Result { + let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); + let handler = Arc::new( + RuntimeHandlerManager::new(id, sender) + .await + .context("new runtime handler")?, + ); + let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; + task_server = task_server.set_domain_unix(); + Ok(Self { + receiver: Some(receiver), + handler, + task_server: Some(task_server), + }) + } + + pub async fn run(&mut self) -> Result<()> { + info!(sl!(), "begin to run service"); + + self.start().await.context("start")?; + let mut rx = self.receiver.take(); + if let Some(rx) = rx.as_mut() { + while let Some(r) = rx.recv().await { + info!(sl!(), "receive action {:?}", &r.action); + let result = match r.action { + Action::Start => self.start().await.context("start listen"), + Action::Stop => self.stop_listen().await.context("stop listen"), + Action::Shutdown => { + self.stop_listen().await.context("stop listen")?; + break; + } + }; + + if let Some(ref sender) = r.resp_sender { + sender.send(result).await.context("send response")?; + } + } + } + + info!(sl!(), "end to run service"); + + Ok(()) + } + + pub fn cleanup(id: &str) -> Result<()> { + RuntimeHandlerManager::cleanup(id) + } + + async fn start(&mut self) -> Result<()> { + let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) + as Box); + let task_server = self.task_server.take(); + let task_server = match task_server { + Some(t) => { + let mut t = t.register_service(shim_async::create_task(task_service)); + t.start().await.context("task server start")?; + Some(t) + } + None => None, + }; + self.task_server = task_server; + Ok(()) + } + + async fn stop_listen(&mut self) -> Result<()> { + let task_server = self.task_server.take(); + let task_server = match task_server { + Some(mut t) => { + t.stop_listen().await; + Some(t) + } + None => None, + }; + self.task_server = task_server; + Ok(()) + } +} diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs new file mode 100644 index 0000000000..77c368d6c9 --- /dev/null +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -0,0 +1,82 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + convert::{TryFrom, TryInto}, + sync::Arc, +}; + +use async_trait::async_trait; +use common::types::{Request, Response}; +use containerd_shim_protos::{api, shim_async}; +use ttrpc::{self, r#async::TtrpcContext}; + +use runtimes::RuntimeHandlerManager; + +pub(crate) struct TaskService { + handler: Arc, +} + +impl TaskService { + pub(crate) fn new(handler: Arc) -> Self { + Self { handler } + } +} + +async fn handler_message( + s: &RuntimeHandlerManager, + ctx: &TtrpcContext, + req: TtrpcReq, +) -> ttrpc::Result +where + Request: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, +{ + let r = req + .try_into() + .map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?; + let logger = sl!().new(o!("steam id" => ctx.mh.stream_id)); + debug!(logger, "====> task service {:?}", &r); + let resp = s + .handler_message(r) + .await + .map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?; + debug!(logger, "<==== task service {:?}", &resp); + Ok(resp + .try_into() + .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))?) +} + +macro_rules! impl_service { + ($($name: tt | $req: ty | $resp: ty),*) => { + #[async_trait] + impl shim_async::Task for TaskService { + $(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> { + handler_message(&self.handler, ctx, req).await + })* + } + }; +} + +impl_service!( + state | api::StateRequest | api::StateResponse, + create | api::CreateTaskRequest | api::CreateTaskResponse, + start | api::StartRequest | api::StartResponse, + delete | api::DeleteRequest | api::DeleteResponse, + pids | api::PidsRequest | api::PidsResponse, + pause | api::PauseRequest | api::Empty, + resume | api::ResumeRequest | api::Empty, + kill | api::KillRequest | api::Empty, + exec | api::ExecProcessRequest | api::Empty, + resize_pty | api::ResizePtyRequest | api::Empty, + update | api::UpdateTaskRequest | api::Empty, + wait | api::WaitRequest | api::WaitResponse, + stats | api::StatsRequest | api::StatsResponse, + connect | api::ConnectRequest | api::ConnectResponse, + shutdown | api::ShutdownRequest | api::Empty +); diff --git a/src/runtime-rs/crates/shim/Cargo.toml b/src/runtime-rs/crates/shim/Cargo.toml index e2c07e4e28..67fac11c13 100644 --- a/src/runtime-rs/crates/shim/Cargo.toml +++ b/src/runtime-rs/crates/shim/Cargo.toml @@ -19,7 +19,7 @@ containerd-shim-protos = { version = "0.2.0", features = ["async"]} go-flag = "0.1.0" libc = "0.2.108" log = "0.4.14" -nix = "0.16.0" +nix = "0.23.1" protobuf = "2.23.0" sha2 = "=0.9.3" slog = {version = "2.7.0", features = ["std", "release_max_level_trace", "max_level_trace"]} @@ -34,11 +34,13 @@ kata-types = { path = "../../../libs/kata-types"} kata-sys-util = { path = "../../../libs/kata-sys-util"} logging = { path = "../../../libs/logging"} oci = { path = "../../../libs/oci" } +service = { path = "../service" } [build-dependencies] vergen = { version = "6", default-features = false, features = ["build", "git", "rustc"] } [dev-dependencies] tempfile = "3.2.0" +rand = "0.8.4" serial_test = "0.5.1" tests_utils = { path = "../../tests/utils"} diff --git a/src/runtime-rs/crates/shim/src/args.rs b/src/runtime-rs/crates/shim/src/args.rs index 62b9653cca..0db33cbf81 100644 --- a/src/runtime-rs/crates/shim/src/args.rs +++ b/src/runtime-rs/crates/shim/src/args.rs @@ -51,8 +51,8 @@ impl Args { )))); } - validate::verify_cid(&self.id).context("verify cid")?; - validate::verify_cid(&self.namespace).context("verify namespace")?; + validate::verify_id(&self.id).context("verify container id")?; + validate::verify_id(&self.namespace).context("verify namespace")?; // Ensure `address` is a valid path. let path = PathBuf::from(self.address.clone()) diff --git a/src/runtime-rs/crates/shim/src/lib.rs b/src/runtime-rs/crates/shim/src/lib.rs index 1130cb7e45..000c5620a2 100644 --- a/src/runtime-rs/crates/shim/src/lib.rs +++ b/src/runtime-rs/crates/shim/src/lib.rs @@ -7,11 +7,7 @@ #[macro_use] extern crate slog; -macro_rules! sl { - () => { - slog_scope::logger().new(slog::o!("subsystem" => "shim")) - }; -} +logging::logger_with_subsystem!(sl, "shim"); mod args; pub use args::Args; diff --git a/src/runtime-rs/crates/shim/src/shim.rs b/src/runtime-rs/crates/shim/src/shim.rs index 164e6c6f60..298f6a3e00 100644 --- a/src/runtime-rs/crates/shim/src/shim.rs +++ b/src/runtime-rs/crates/shim/src/shim.rs @@ -31,7 +31,7 @@ impl ShimExecutor { } pub(crate) fn load_oci_spec(&self, path: &Path) -> Result { - let spec_file = path.join("config.json"); + let spec_file = path.join(oci::OCI_SPEC_CONFIG_FILE_NAME); oci::Spec::load(spec_file.to_str().unwrap_or_default()).context("load spec") } diff --git a/src/runtime-rs/crates/shim/src/shim_delete.rs b/src/runtime-rs/crates/shim/src/shim_delete.rs index 4d9890d138..57082d2129 100644 --- a/src/runtime-rs/crates/shim/src/shim_delete.rs +++ b/src/runtime-rs/crates/shim/src/shim_delete.rs @@ -5,7 +5,7 @@ // use anyhow::{Context, Result}; -use containerd_shim_protos::shim::shim::DeleteResponse; +use containerd_shim_protos::api; use protobuf::Message; use crate::{shim::ShimExecutor, Error}; @@ -19,8 +19,8 @@ impl ShimExecutor { Ok(()) } - fn do_cleanup(&self) -> Result { - let mut rsp = DeleteResponse::new(); + fn do_cleanup(&self) -> Result { + let mut rsp = api::DeleteResponse::new(); rsp.set_exit_status(128 + libc::SIGKILL as u32); let mut exited_time = protobuf::well_known_types::Timestamp::new(); let seconds = std::time::SystemTime::now() @@ -30,42 +30,7 @@ impl ShimExecutor { exited_time.set_seconds(seconds); rsp.set_exited_at(exited_time); - // TODO: implement cleanup + service::ServiceManager::cleanup(&self.args.id).context("cleanup")?; Ok(rsp) } } - -#[cfg(test)] -mod tests { - use serial_test::serial; - use tests_utils::gen_id; - - use super::*; - use crate::Args; - - #[test] - #[serial] - fn test_shim_delete() { - let dir = tempfile::tempdir().unwrap(); - let bundle_path = dir.path(); - std::env::set_current_dir(bundle_path).unwrap(); - - let id = gen_id(16); - let namespace = gen_id(16); - let args = Args { - id, - namespace, - address: "containerd_socket".into(), - publish_binary: "containerd".into(), - socket: "socket".into(), - bundle: bundle_path.to_str().unwrap().into(), - debug: false, - }; - - let executor = ShimExecutor::new(args); - - let resp = executor.do_cleanup().unwrap(); - assert_eq!(resp.exit_status, 128 + libc::SIGKILL as u32); - assert!(resp.exited_at.as_ref().unwrap().seconds > 0); - } -} diff --git a/src/runtime-rs/crates/shim/src/shim_run.rs b/src/runtime-rs/crates/shim/src/shim_run.rs index 76058964bd..112b835179 100644 --- a/src/runtime-rs/crates/shim/src/shim_run.rs +++ b/src/runtime-rs/crates/shim/src/shim_run.rs @@ -38,8 +38,11 @@ impl ShimExecutor { info!(sl!(), "start to run"); self.args.validate(false).context("validata")?; - let _server_fd = get_server_fd().context("get server fd")?; - // TODO: implement run + let server_fd = get_server_fd().context("get server fd")?; + let mut service_manager = service::ServiceManager::new(&self.args.id, server_fd) + .await + .context("new runtime server")?; + service_manager.run().await.context("run")?; Ok(()) }