diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index f9e199efc8..2533ab1ebb 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -26,6 +26,7 @@ dependencies = [ "futures 0.1.31", "kata-types", "log", + "logging", "oci", "protobuf", "protocols", @@ -61,9 +62,9 @@ checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" [[package]] name = "async-trait" -version = "0.1.52" +version = "0.1.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" dependencies = [ "proc-macro2", "quote", @@ -93,9 +94,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "block-buffer" @@ -170,7 +171,7 @@ dependencies = [ "libc", "num-integer", "num-traits", - "time 0.1.43", + "time 0.1.44", "winapi", ] @@ -178,6 +179,7 @@ dependencies = [ name = "common" version = "0.1.0" dependencies = [ + "agent", "anyhow", "async-trait", "containerd-shim-protos", @@ -434,13 +436,24 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.5" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi 0.10.2+wasi-snapshot-preview1", + "wasi 0.9.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", ] [[package]] @@ -463,9 +476,9 @@ checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" [[package]] name = "git2" -version = "0.14.2" +version = "0.13.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3826a6e0e2215d7a41c2bfc7c9244123969273f3476b939a226aac0ab56e9e3c" +checksum = "f29229cc1b24c0e6062f6e742aa3e256492a5323365e5ed3413599f8a5eff7d6" dependencies = [ "bitflags", "libc", @@ -547,9 +560,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" dependencies = [ "autocfg", "hashbrown", @@ -601,6 +614,7 @@ dependencies = [ name = "kata-sys-util" version = "0.1.0" dependencies = [ + "byteorder", "cgroups-rs", "chrono", "common-path", @@ -611,6 +625,7 @@ dependencies = [ "nix 0.23.1", "oci", "once_cell", + "rand 0.7.3", "serde_json", "slog", "slog-scope", @@ -643,15 +658,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259" [[package]] name = "libgit2-sys" -version = "0.13.2+1.4.2" +version = "0.12.26+1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a42de9a51a5c12e00fc0e4ca6bc2ea43582fc6418488e8f615e905d886f258b" +checksum = "19e1c899248e606fbfe68dcb31d8b0176ebab833b103824af31bddf4b7457494" dependencies = [ "cc", "libc", @@ -683,18 +698,19 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" dependencies = [ + "autocfg", "scopeguard", ] [[package]] name = "log" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" dependencies = [ "cfg-if 1.0.0", ] @@ -929,9 +945,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" [[package]] name = "ppv-lite86" @@ -965,9 +981,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" dependencies = [ "unicode-xid", ] @@ -1065,9 +1081,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.16" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" dependencies = [ "proc-macro2", ] @@ -1095,6 +1111,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -1102,10 +1131,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", + "rand_chacha 0.3.1", "rand_core 0.6.3", ] +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -1131,13 +1170,31 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", +] + [[package]] name = "rand_core" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom", + "getrandom 0.2.6", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -1151,9 +1208,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.11" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" dependencies = [ "bitflags", ] @@ -1260,9 +1317,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "semver" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" +checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" [[package]] name = "serde" @@ -1389,9 +1446,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "slog" @@ -1413,14 +1470,14 @@ dependencies = [ [[package]] name = "slog-json" -version = "2.6.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70f825ce7346f40aa318111df5d3a94945a7fdca9081584cb9b05692fb3dfcb4" +checksum = "3e1e53f61af1e3c8b852eef0a9dee29008f55d6dd63794f3f12cef786cf0f219" dependencies = [ "serde", "serde_json", "slog", - "time 0.3.7", + "time 0.3.9", ] [[package]] @@ -1436,9 +1493,9 @@ dependencies = [ [[package]] name = "slog-stdlog" -version = "4.1.0" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8228ab7302adbf4fcb37e66f3cda78003feb521e7fd9e3847ec117a7784d0f5a" +checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e" dependencies = [ "log", "slog", @@ -1495,9 +1552,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.89" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" dependencies = [ "proc-macro2", "quote", @@ -1562,19 +1619,20 @@ dependencies = [ [[package]] name = "time" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", + "wasi 0.10.0+wasi-snapshot-preview1", "winapi", ] [[package]] name = "time" -version = "0.3.7" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" +checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" dependencies = [ "itoa", "libc", @@ -1815,9 +1873,15 @@ dependencies = [ [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasi" diff --git a/src/runtime-rs/crates/agent/Cargo.toml b/src/runtime-rs/crates/agent/Cargo.toml index bd17f82d00..7fde8bf85c 100644 --- a/src/runtime-rs/crates/agent/Cargo.toml +++ b/src/runtime-rs/crates/agent/Cargo.toml @@ -21,6 +21,7 @@ tokio = { version = "1.8.0", features = ["fs", "rt"] } url = "2.2.2" kata-types = { path = "../../../libs/kata-types"} +logging = { path = "../../../libs/logging"} oci = { path = "../../../libs/oci" } protocols = { path = "../../../libs/protocols", features=["async"] } diff --git a/src/runtime-rs/crates/agent/src/kata/agent.rs b/src/runtime-rs/crates/agent/src/kata/agent.rs index 9f8b4304d3..90a812d441 100644 --- a/src/runtime-rs/crates/agent/src/kata/agent.rs +++ b/src/runtime-rs/crates/agent/src/kata/agent.rs @@ -20,14 +20,11 @@ fn new_ttrpc_ctx(timeout: i64) -> ttrpc_ctx::Context { #[async_trait] impl AgentManager for KataAgent { - async fn set_socket_address(&self, address: &str) -> Result<()> { - let mut inner = self.inner.lock().await; - inner.socket_address = address.to_string(); - Ok(()) - } - - async fn start(&self) -> Result<()> { - info!(sl!(), "begin to connect agent"); + async fn start(&self, address: &str) -> Result<()> { + info!(sl!(), "begin to connect agent {:?}", address); + self.set_socket_address(address) + .await + .context("set socket")?; self.connect_agent_server() .await .context("connect agent server")?; diff --git a/src/runtime-rs/crates/agent/src/kata/mod.rs b/src/runtime-rs/crates/agent/src/kata/mod.rs index 043b9aa14e..dd7831d35b 100644 --- a/src/runtime-rs/crates/agent/src/kata/mod.rs +++ b/src/runtime-rs/crates/agent/src/kata/mod.rs @@ -82,6 +82,12 @@ impl KataAgent { }) } + pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> { + let mut inner = self.inner.lock().await; + inner.socket_address = address.to_string(); + Ok(()) + } + pub(crate) async fn connect_agent_server(&self) -> Result<()> { let mut inner = self.inner.lock().await; diff --git a/src/runtime-rs/crates/agent/src/kata/trans.rs b/src/runtime-rs/crates/agent/src/kata/trans.rs index c8e4dbca7c..033f0bd979 100644 --- a/src/runtime-rs/crates/agent/src/kata/trans.rs +++ b/src/runtime-rs/crates/agent/src/kata/trans.rs @@ -254,8 +254,8 @@ impl From for Routes { impl From for agent::CreateContainerRequest { fn from(from: CreateContainerRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), string_user: from_option(from.string_user), devices: from_vec(from.devices), storages: from_vec(from.storages), @@ -321,8 +321,8 @@ impl From for agent::ResumeContainerRequest { impl From for agent::SignalProcessRequest { fn from(from: SignalProcessRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), signal: from.signal, unknown_fields: Default::default(), cached_size: Default::default(), @@ -333,8 +333,8 @@ impl From for agent::SignalProcessRequest { impl From for agent::WaitProcessRequest { fn from(from: WaitProcessRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), unknown_fields: Default::default(), cached_size: Default::default(), } @@ -355,8 +355,8 @@ impl From for agent::UpdateContainerRequest { impl From for agent::WriteStreamRequest { fn from(from: WriteStreamRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), data: from.data, unknown_fields: Default::default(), cached_size: Default::default(), @@ -373,8 +373,8 @@ impl From for WriteStreamResponse { impl From for agent::ExecProcessRequest { fn from(from: ExecProcessRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), string_user: from_option(from.string_user), process: from_option(from.process), unknown_fields: Default::default(), @@ -523,8 +523,8 @@ impl From for StatsContainerResponse { impl From for agent::ReadStreamRequest { fn from(from: ReadStreamRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), len: from.len, unknown_fields: Default::default(), cached_size: Default::default(), @@ -541,8 +541,8 @@ impl From for ReadStreamResponse { impl From for agent::CloseStdinRequest { fn from(from: CloseStdinRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), unknown_fields: Default::default(), cached_size: Default::default(), } @@ -552,8 +552,8 @@ impl From for agent::CloseStdinRequest { impl From for agent::TtyWinResizeRequest { fn from(from: TtyWinResizeRequest) -> Self { Self { - container_id: from.container_id, - exec_id: from.exec_id, + container_id: from.process_id.container_id(), + exec_id: from.process_id.exec_id(), row: from.row, column: from.column, unknown_fields: Default::default(), diff --git a/src/runtime-rs/crates/agent/src/lib.rs b/src/runtime-rs/crates/agent/src/lib.rs index a9d8a15f92..9c72a76847 100644 --- a/src/runtime-rs/crates/agent/src/lib.rs +++ b/src/runtime-rs/crates/agent/src/lib.rs @@ -7,11 +7,7 @@ #[macro_use] extern crate slog; -macro_rules! sl { - () => { - slog_scope::logger().new(slog::o!("subsystem" => "agent")) - }; -} +logging::logger_with_subsystem!(sl, "agent"); pub mod kata; mod log_forwarder; @@ -19,14 +15,15 @@ mod sock; mod types; pub use types::{ ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, BlkioStatsEntry, CheckRequest, - CloseStdinRequest, ContainerID, CopyFileRequest, CreateContainerRequest, CreateSandboxRequest, - Empty, ExecProcessRequest, GetGuestDetailsRequest, GuestDetailsResponse, HealthCheckResponse, - IPAddress, IPFamily, Interface, Interfaces, ListProcessesRequest, MemHotplugByProbeRequest, - OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest, ReadStreamResponse, - RemoveContainerRequest, ReseedRandomDevRequest, Route, Routes, SetGuestDateTimeRequest, - SignalProcessRequest, StatsContainerResponse, Storage, TtyWinResizeRequest, - UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest, VersionCheckResponse, - WaitProcessRequest, WaitProcessResponse, WriteStreamRequest, WriteStreamResponse, + CloseStdinRequest, ContainerID, ContainerProcessID, CopyFileRequest, CreateContainerRequest, + CreateSandboxRequest, Empty, ExecProcessRequest, GetGuestDetailsRequest, GuestDetailsResponse, + HealthCheckResponse, IPAddress, IPFamily, Interface, Interfaces, ListProcessesRequest, + MemHotplugByProbeRequest, OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest, + ReadStreamResponse, RemoveContainerRequest, ReseedRandomDevRequest, Route, Routes, + SetGuestDateTimeRequest, SignalProcessRequest, StatsContainerResponse, Storage, + TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest, + VersionCheckResponse, WaitProcessRequest, WaitProcessResponse, WriteStreamRequest, + WriteStreamResponse, }; use anyhow::Result; @@ -34,8 +31,7 @@ use async_trait::async_trait; #[async_trait] pub trait AgentManager: Send + Sync { - async fn set_socket_address(&self, address: &str) -> Result<()>; - async fn start(&self) -> Result<()>; + async fn start(&self, address: &str) -> Result<()>; async fn stop(&self); } @@ -57,6 +53,7 @@ pub trait Agent: AgentManager + HealthService + Send + Sync { async fn list_routes(&self, req: Empty) -> Result; async fn update_interface(&self, req: UpdateInterfaceRequest) -> Result; async fn update_routes(&self, req: UpdateRoutesRequest) -> Result; + // container async fn create_container(&self, req: CreateContainerRequest) -> Result; async fn pause_container(&self, req: ContainerID) -> Result; diff --git a/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs b/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs index 7079ea392e..59e93a64d2 100644 --- a/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs +++ b/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs @@ -56,7 +56,7 @@ impl Sock for HybridVsock { } } } - Err(anyhow!("cannot connect to agent ttrpc server")) + Err(anyhow!("cannot connect to agent ttrpc server {:?}", config)) } } diff --git a/src/runtime-rs/crates/agent/src/sock/mod.rs b/src/runtime-rs/crates/agent/src/sock/mod.rs index 3ec4da6aa9..371f62cd44 100644 --- a/src/runtime-rs/crates/agent/src/sock/mod.rs +++ b/src/runtime-rs/crates/agent/src/sock/mod.rs @@ -79,6 +79,7 @@ impl AsyncRead for Stream { } /// Connect config +#[derive(Debug)] pub struct ConnectConfig { dial_timeout_ms: u64, reconnect_timeout_ms: u64, diff --git a/src/runtime-rs/crates/agent/src/types.rs b/src/runtime-rs/crates/agent/src/types.rs index 41e0e3ee36..caf507c9ae 100644 --- a/src/runtime-rs/crates/agent/src/types.rs +++ b/src/runtime-rs/crates/agent/src/types.rs @@ -97,8 +97,7 @@ pub struct Routes { #[derive(PartialEq, Clone, Default)] pub struct CreateContainerRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, pub string_user: Option, pub devices: Vec, pub storages: Vec, @@ -121,6 +120,29 @@ impl ContainerID { } } +#[derive(PartialEq, Clone, Default)] +pub struct ContainerProcessID { + pub container_id: ContainerID, + pub exec_id: String, +} + +impl ContainerProcessID { + pub fn new(container_id: &str, exec_id: &str) -> Self { + Self { + container_id: ContainerID::new(container_id), + exec_id: exec_id.to_string(), + } + } + + pub fn container_id(&self) -> String { + self.container_id.container_id.clone() + } + + pub fn exec_id(&self) -> String { + self.exec_id.clone() + } +} + #[derive(PartialEq, Clone, Debug, Default)] pub struct RemoveContainerRequest { pub container_id: String, @@ -138,15 +160,13 @@ impl RemoveContainerRequest { #[derive(PartialEq, Clone, Default)] pub struct SignalProcessRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, pub signal: u32, } #[derive(PartialEq, Clone, Default)] pub struct WaitProcessRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, } #[derive(PartialEq, Clone, Default)] @@ -165,8 +185,7 @@ pub struct UpdateContainerRequest { #[derive(PartialEq, Clone, Default)] pub struct WriteStreamRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, pub data: Vec, } @@ -177,8 +196,7 @@ pub struct WriteStreamResponse { #[derive(PartialEq, Clone, Default)] pub struct ExecProcessRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, pub string_user: Option, pub process: Option, } @@ -297,8 +315,7 @@ pub struct WaitProcessResponse { #[derive(PartialEq, Clone, Default)] pub struct ReadStreamRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, pub len: u32, } @@ -309,14 +326,12 @@ pub struct ReadStreamResponse { #[derive(PartialEq, Clone, Default)] pub struct CloseStdinRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, } #[derive(PartialEq, Clone, Default)] pub struct TtyWinResizeRequest { - pub container_id: String, - pub exec_id: String, + pub process_id: ContainerProcessID, pub row: u32, pub column: u32, } diff --git a/src/runtime-rs/crates/runtimes/common/Cargo.toml b/src/runtime-rs/crates/runtimes/common/Cargo.toml index f2bff39c84..bfa08452b7 100644 --- a/src/runtime-rs/crates/runtimes/common/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/common/Cargo.toml @@ -21,6 +21,7 @@ thiserror = "^1.0" tokio = { version = "1.8.0", features = ["rt-multi-thread", "process", "fs"] } ttrpc = { version = "0.6.0" } +agent = { path = "../../agent" } kata-sys-util = { path = "../../../../libs/kata-sys-util" } kata-types = { path = "../../../../libs/kata-types" } oci = { path = "../../../../libs/oci" } diff --git a/src/runtime-rs/crates/runtimes/common/src/container_manager.rs b/src/runtime-rs/crates/runtimes/common/src/container_manager.rs index aeba770a66..040b557ee6 100644 --- a/src/runtime-rs/crates/runtimes/common/src/container_manager.rs +++ b/src/runtime-rs/crates/runtimes/common/src/container_manager.rs @@ -16,7 +16,7 @@ use crate::types::{ #[async_trait] pub trait ContainerManager: Send + Sync { // container lifecycle - async fn create_container(&self, config: ContainerConfig) -> Result; + async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result; async fn pause_container(&self, container_id: &ContainerID) -> Result<()>; async fn resume_container(&self, container_id: &ContainerID) -> Result<()>; async fn stats_container(&self, container_id: &ContainerID) -> Result; diff --git a/src/runtime-rs/crates/runtimes/common/src/message.rs b/src/runtime-rs/crates/runtimes/common/src/message.rs index ff6ee960d7..856a6e5990 100644 --- a/src/runtime-rs/crates/runtimes/common/src/message.rs +++ b/src/runtime-rs/crates/runtimes/common/src/message.rs @@ -3,8 +3,10 @@ // // SPDX-License-Identifier: Apache-2.0 // +use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; +use containerd_shim_protos::{events::task::TaskOOM, protobuf::Message as ProtobufMessage}; use tokio::sync::mpsc::{channel, Receiver, Sender}; /// message receiver buffer size @@ -15,8 +17,12 @@ pub enum Action { Start, Stop, Shutdown, + Event(Arc), } +unsafe impl Send for Message {} +unsafe impl Sync for Message {} + #[derive(Debug)] pub struct Message { pub action: Action, @@ -42,3 +48,25 @@ impl Message { ) } } + +const TASK_OOM_EVENT_TOPIC: &str = "/tasks/oom"; + +pub trait Event: std::fmt::Debug + Send { + fn r#type(&self) -> String; + fn type_url(&self) -> String; + fn value(&self) -> Result>; +} + +impl Event for TaskOOM { + fn r#type(&self) -> String { + TASK_OOM_EVENT_TOPIC.to_string() + } + + fn type_url(&self) -> String { + "containerd.events.TaskOOM".to_string() + } + + fn value(&self) -> Result> { + self.write_to_bytes().context("get oom value") + } +} diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs index 699fc1977a..1b175204c5 100644 --- a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -7,9 +7,11 @@ use anyhow::Result; use async_trait::async_trait; +use kata_types::config::TomlConfig; + #[async_trait] pub trait Sandbox: Send + Sync { - async fn start(&self) -> Result<()>; + async fn start(&self, netns: Option, config: &TomlConfig) -> Result<()>; async fn stop(&self) -> Result<()>; async fn cleanup(&self, container_id: &str) -> Result<()>; async fn shutdown(&self) -> Result<()>; diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index e398735ff7..14f188d7d3 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -4,7 +4,9 @@ // SPDX-License-Identifier: Apache-2.0 // +mod trans_from_agent; mod trans_from_shim; +mod trans_into_agent; mod trans_into_shim; use std::fmt; @@ -69,6 +71,12 @@ pub struct ContainerID { pub container_id: String, } +impl ToString for ContainerID { + fn to_string(&self) -> String { + self.container_id.clone() + } +} + impl ContainerID { pub fn new(container_id: &str) -> Result { validate::verify_id(container_id).context("verify container id")?; @@ -105,6 +113,14 @@ impl ContainerProcess { process_type, }) } + + pub fn container_id(&self) -> &str { + &self.container_id.container_id + } + + pub fn exec_id(&self) -> &str { + &self.exec_id + } } #[derive(Debug, Clone)] pub struct ContainerConfig { @@ -130,7 +146,7 @@ impl PID { #[derive(Debug, Clone)] pub struct KillRequest { - pub process_id: ContainerProcess, + pub process: ContainerProcess, pub signal: u32, pub all: bool, } @@ -143,14 +159,14 @@ pub struct ShutdownRequest { #[derive(Debug, Clone)] pub struct ResizePTYRequest { - pub process_id: ContainerProcess, + pub process: ContainerProcess, pub width: u32, pub height: u32, } #[derive(Debug, Clone)] pub struct ExecProcessRequest { - pub process_id: ContainerProcess, + pub process: ContainerProcess, pub terminal: bool, pub stdin: Option, pub stdout: Option, diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_agent.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_agent.rs new file mode 100644 index 0000000000..8877771223 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_agent.rs @@ -0,0 +1,214 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::convert::From; + +use containerd_shim_protos::cgroups::metrics; +use protobuf::Message; + +use super::{StatsInfo, StatsInfoValue}; + +// TODO: trans from agent proto? +impl From> for StatsInfo { + fn from(c_stats: Option) -> Self { + let mut metric = metrics::Metrics::new(); + let stats = match c_stats { + None => { + return StatsInfo { value: None }; + } + Some(stats) => stats, + }; + + if let Some(cg_stats) = stats.cgroup_stats { + if let Some(cpu) = cg_stats.cpu_stats { + // set protobuf cpu stat + let mut p_cpu = metrics::CPUStat::new(); + if let Some(usage) = cpu.cpu_usage { + let mut p_usage = metrics::CPUUsage::new(); + p_usage.set_total(usage.total_usage); + p_usage.set_per_cpu(usage.percpu_usage); + p_usage.set_kernel(usage.usage_in_kernelmode); + p_usage.set_user(usage.usage_in_usermode); + + // set protobuf cpu usage + p_cpu.set_usage(p_usage); + } + + if let Some(throttle) = cpu.throttling_data { + let mut p_throttle = metrics::Throttle::new(); + p_throttle.set_periods(throttle.periods); + p_throttle.set_throttled_time(throttle.throttled_time); + p_throttle.set_throttled_periods(throttle.throttled_periods); + + // set protobuf cpu usage + p_cpu.set_throttling(p_throttle); + } + + metric.set_cpu(p_cpu); + } + + if let Some(m_stats) = cg_stats.memory_stats { + let mut p_m = metrics::MemoryStat::new(); + p_m.set_cache(m_stats.cache); + // memory usage + if let Some(m_data) = m_stats.usage { + let mut p_m_entry = metrics::MemoryEntry::new(); + p_m_entry.set_usage(m_data.usage); + p_m_entry.set_limit(m_data.limit); + p_m_entry.set_failcnt(m_data.failcnt); + p_m_entry.set_max(m_data.max_usage); + + p_m.set_usage(p_m_entry); + } + // memory swap_usage + if let Some(m_data) = m_stats.swap_usage { + let mut p_m_entry = metrics::MemoryEntry::new(); + p_m_entry.set_usage(m_data.usage); + p_m_entry.set_limit(m_data.limit); + p_m_entry.set_failcnt(m_data.failcnt); + p_m_entry.set_max(m_data.max_usage); + + p_m.set_swap(p_m_entry); + } + // memory kernel_usage + if let Some(m_data) = m_stats.kernel_usage { + let mut p_m_entry = metrics::MemoryEntry::new(); + p_m_entry.set_usage(m_data.usage); + p_m_entry.set_limit(m_data.limit); + p_m_entry.set_failcnt(m_data.failcnt); + p_m_entry.set_max(m_data.max_usage); + + p_m.set_kernel(p_m_entry); + } + + for (k, v) in m_stats.stats { + match k.as_str() { + "dirty" => p_m.set_dirty(v), + "rss" => p_m.set_rss(v), + "rss_huge" => p_m.set_rss_huge(v), + "mapped_file" => p_m.set_mapped_file(v), + "writeback" => p_m.set_writeback(v), + "pg_pg_in" => p_m.set_pg_pg_in(v), + "pg_pg_out" => p_m.set_pg_pg_out(v), + "pg_fault" => p_m.set_pg_fault(v), + "pg_maj_fault" => p_m.set_pg_maj_fault(v), + "inactive_file" => p_m.set_inactive_file(v), + "inactive_anon" => p_m.set_inactive_anon(v), + "active_file" => p_m.set_active_file(v), + "unevictable" => p_m.set_unevictable(v), + "hierarchical_memory_limit" => p_m.set_hierarchical_memory_limit(v), + "hierarchical_swap_limit" => p_m.set_hierarchical_swap_limit(v), + "total_cache" => p_m.set_total_cache(v), + "total_rss" => p_m.set_total_rss(v), + "total_mapped_file" => p_m.set_total_mapped_file(v), + "total_dirty" => p_m.set_total_dirty(v), + + "total_pg_pg_in" => p_m.set_total_pg_pg_in(v), + "total_pg_pg_out" => p_m.set_total_pg_pg_out(v), + "total_pg_fault" => p_m.set_total_pg_fault(v), + "total_pg_maj_fault" => p_m.set_total_pg_maj_fault(v), + "total_inactive_file" => p_m.set_total_inactive_file(v), + "total_inactive_anon" => p_m.set_total_inactive_anon(v), + "total_active_file" => p_m.set_total_active_file(v), + "total_unevictable" => p_m.set_total_unevictable(v), + _ => (), + } + } + metric.set_memory(p_m); + } + + if let Some(pid_stats) = cg_stats.pids_stats { + let mut p_pid = metrics::PidsStat::new(); + p_pid.set_limit(pid_stats.limit); + p_pid.set_current(pid_stats.current); + metric.set_pids(p_pid); + } + + if let Some(blk_stats) = cg_stats.blkio_stats { + let mut p_blk_stats = metrics::BlkIOStat::new(); + p_blk_stats + .set_io_serviced_recursive(copy_blkio_entry(&blk_stats.io_serviced_recursive)); + p_blk_stats.set_io_service_bytes_recursive(copy_blkio_entry( + &blk_stats.io_service_bytes_recursive, + )); + p_blk_stats + .set_io_queued_recursive(copy_blkio_entry(&blk_stats.io_queued_recursive)); + p_blk_stats.set_io_service_time_recursive(copy_blkio_entry( + &blk_stats.io_service_time_recursive, + )); + p_blk_stats.set_io_wait_time_recursive(copy_blkio_entry( + &blk_stats.io_wait_time_recursive, + )); + p_blk_stats + .set_io_merged_recursive(copy_blkio_entry(&blk_stats.io_merged_recursive)); + p_blk_stats.set_io_time_recursive(copy_blkio_entry(&blk_stats.io_time_recursive)); + p_blk_stats.set_sectors_recursive(copy_blkio_entry(&blk_stats.sectors_recursive)); + + metric.set_blkio(p_blk_stats); + } + + if !cg_stats.hugetlb_stats.is_empty() { + let mut p_huge = ::protobuf::RepeatedField::new(); + for (k, v) in cg_stats.hugetlb_stats { + let mut h = metrics::HugetlbStat::new(); + h.set_pagesize(k); + h.set_max(v.max_usage); + h.set_usage(v.usage); + h.set_failcnt(v.failcnt); + p_huge.push(h); + } + metric.set_hugetlb(p_huge); + } + } + + let net_stats = stats.network_stats; + if !net_stats.is_empty() { + let mut p_net = ::protobuf::RepeatedField::new(); + for v in net_stats.iter() { + let mut h = metrics::NetworkStat::new(); + h.set_name(v.name.clone()); + + h.set_tx_bytes(v.tx_bytes); + h.set_tx_packets(v.tx_packets); + h.set_tx_errors(v.tx_errors); + h.set_tx_dropped(v.tx_dropped); + + h.set_rx_bytes(v.rx_bytes); + h.set_rx_packets(v.rx_packets); + h.set_rx_errors(v.rx_errors); + h.set_rx_dropped(v.rx_dropped); + + p_net.push(h); + } + metric.set_network(p_net); + } + + StatsInfo { + value: Some(StatsInfoValue { + type_url: "io.containerd.cgroups.v1.Metrics".to_string(), + value: metric.write_to_bytes().unwrap(), + }), + } + } +} + +fn copy_blkio_entry( + entry: &[agent::BlkioStatsEntry], +) -> ::protobuf::RepeatedField { + let mut p_entry = ::protobuf::RepeatedField::new(); + + for e in entry.iter() { + let mut blk = metrics::BlkIOEntry::new(); + blk.set_op(e.op.clone()); + blk.set_value(e.value); + blk.set_major(e.major); + blk.set_minor(e.minor); + + p_entry.push(blk); + } + + p_entry +} diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs index c26bb68281..07f1f8d79e 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs @@ -82,7 +82,7 @@ impl TryFrom for Request { fn try_from(from: api::ExecProcessRequest) -> Result { let spec = from.get_spec(); Ok(Request::ExecProcess(ExecProcessRequest { - process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, terminal: from.terminal, stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()), stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()), @@ -97,7 +97,7 @@ impl TryFrom for Request { type Error = anyhow::Error; fn try_from(from: api::KillRequest) -> Result { Ok(Request::KillProcess(KillRequest { - process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, signal: from.signal, all: from.all, })) @@ -145,7 +145,7 @@ impl TryFrom for Request { type Error = anyhow::Error; fn try_from(from: api::ResizePtyRequest) -> Result { Ok(Request::ResizeProcessPTY(ResizePTYRequest { - process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, + process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, width: from.width, height: from.height, })) diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_agent.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_agent.rs new file mode 100644 index 0000000000..f032fd70bc --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_agent.rs @@ -0,0 +1,28 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::convert::From; + +use agent; + +use super::{ContainerID, ContainerProcess}; + +impl From for agent::ContainerID { + fn from(from: ContainerID) -> Self { + Self { + container_id: from.container_id, + } + } +} + +impl From for agent::ContainerProcessID { + fn from(from: ContainerProcess) -> Self { + Self { + container_id: from.container_id.into(), + exec_id: from.exec_id, + } + } +} diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index f3006f8561..cfeab919be 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -25,7 +25,7 @@ use wasm_container::WasmContainer; struct RuntimeHandlerManagerInner { id: String, msg_sender: Sender, - runtime_instance: Option, + runtime_instance: Option>, } impl RuntimeHandlerManagerInner { @@ -37,26 +37,22 @@ impl RuntimeHandlerManagerInner { }) } - async fn init_runtime_handler(&mut self, runtime_name: &str) -> Result<()> { - info!(sl!(), "new runtime handler {}", runtime_name); - - let runtime_handler = match runtime_name { + async fn init_runtime_handler( + &mut self, + netns: Option, + config: &TomlConfig, + ) -> Result<()> { + info!(sl!(), "new runtime handler {}", &config.runtime.name); + let runtime_handler = match config.runtime.name.as_str() { #[cfg(feature = "linux")] - name if name == LinuxContainer::name() => { - LinuxContainer::init().context("init linux container")?; - LinuxContainer::new_handler() - } + name if name == LinuxContainer::name() => LinuxContainer::new_handler(), #[cfg(feature = "wasm")] - name if name == WasmContainer::name() => { - WasmContainer::init().context("init wasm container")?; - WasmContainer::new_handler() - } + name if name == WasmContainer::name() => WasmContainer::new_handler(), #[cfg(feature = "virt")] - name if name == VirtContainer::name() => { - VirtContainer::init().context("init virt container")?; + name if name == VirtContainer::name() || name.is_empty() => { VirtContainer::new_handler() } - _ => return Err(anyhow!("Unsupported runtime: {}", runtime_name)), + _ => return Err(anyhow!("Unsupported runtime: {}", &config.runtime.name)), }; let runtime_instance = runtime_handler .new_instance(&self.id, self.msg_sender.clone()) @@ -66,10 +62,10 @@ impl RuntimeHandlerManagerInner { // start sandbox runtime_instance .sandbox - .start() + .start(netns, config) .await .context("start sandbox")?; - self.runtime_instance = Some(runtime_instance); + self.runtime_instance = Some(Arc::new(runtime_instance)); Ok(()) } @@ -79,15 +75,39 @@ impl RuntimeHandlerManagerInner { return Ok(()); } + #[cfg(feature = "linux")] + LinuxContainer::init().context("init linux container")?; + #[cfg(feature = "wasm")] + WasmContainer::init().context("init wasm container")?; + #[cfg(feature = "virt")] + VirtContainer::init().context("init virt container")?; + + let netns = if let Some(linux) = &spec.linux { + let mut netns = None; + for ns in &linux.namespaces { + if ns.r#type.as_str() != oci::NETWORKNAMESPACE { + continue; + } + + if !ns.path.is_empty() { + netns = Some(ns.path.clone()); + break; + } + } + netns + } else { + None + }; + let config = load_config(spec).context("load config")?; - self.init_runtime_handler(&config.runtime.name) + self.init_runtime_handler(netns, &config) .await .context("init runtime handler")?; Ok(()) } - fn get_runtime_instance(&self) -> Option { + fn get_runtime_instance(&self) -> Option> { self.runtime_instance.clone() } } @@ -112,25 +132,35 @@ impl RuntimeHandlerManager { Ok(()) } + async fn get_runtime_instance(&self) -> Result> { + let inner = self.inner.read().await; + inner + .get_runtime_instance() + .ok_or_else(|| anyhow!("runtime not ready")) + } + + async fn try_init_runtime_instance(&self, spec: &oci::Spec) -> Result<()> { + let mut inner = self.inner.write().await; + inner.try_init(spec).await + } + pub async fn handler_message(&self, req: Request) -> Result { if let Request::CreateContainer(req) = req { // get oci spec let bundler_path = format!("{}/{}", req.bundle, oci::OCI_SPEC_CONFIG_FILE_NAME); let spec = oci::Spec::load(&bundler_path).context("load spec")?; - let mut inner = self.inner.write().await; - inner - .try_init(&spec) + self.try_init_runtime_instance(&spec) .await - .context("try init runtime handler")?; - - let instance = inner + .context("try init runtime instance")?; + let instance = self .get_runtime_instance() - .ok_or_else(|| anyhow!("runtime not ready"))?; + .await + .context("get runtime instance")?; let shim_pid = instance .container_manager - .create_container(req) + .create_container(req, spec) .await .context("create container")?; Ok(Response::CreateContainer(shim_pid)) @@ -140,12 +170,12 @@ impl RuntimeHandlerManager { } pub async fn handler_request(&self, req: Request) -> Result { - let inner = self.inner.read().await; - let instance = inner + let instance = self .get_runtime_instance() - .ok_or_else(|| anyhow!("runtime not ready"))?; - let sandbox = instance.sandbox; - let cm = instance.container_manager; + .await + .context("get runtime instance")?; + let sandbox = instance.sandbox.clone(); + let cm = instance.container_manager.clone(); match req { Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)), diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index dac1b2275f..d22cdf86f1 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -5,15 +5,24 @@ // use std::{ + fs, os::unix::io::{FromRawFd, RawFd}, + process::Stdio, sync::Arc, }; use anyhow::{Context, Result}; -use common::message::{Action, Message}; -use containerd_shim_protos::shim_async; +use common::message::{Action, Event, Message}; +use containerd_shim_protos::{ + protobuf::{well_known_types::Any, Message as ProtobufMessage}, + shim_async, +}; use runtimes::RuntimeHandlerManager; -use tokio::sync::mpsc::{channel, Receiver}; +use tokio::{ + io::AsyncWriteExt, + process::Command, + sync::mpsc::{channel, Receiver}, +}; use ttrpc::asynchronous::Server; use crate::task_service::TaskService; @@ -21,14 +30,66 @@ use crate::task_service::TaskService; /// message buffer size const MESSAGE_BUFFER_SIZE: usize = 8; +pub const KATA_PATH: &str = "/run/kata"; + pub struct ServiceManager { receiver: Option>, handler: Arc, task_server: Option, + binary: String, + address: String, + namespace: String, +} + +async fn send_event( + containerd_binary: String, + address: String, + namespace: String, + event: Arc, +) -> Result<()> { + let any = Any { + type_url: event.type_url(), + value: event.value().context("get event value")?, + ..Default::default() + }; + let data = any.write_to_bytes().context("write to any")?; + let mut child = Command::new(containerd_binary) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .args(&[ + "--address", + &address, + "publish", + "--topic", + &event.r#type(), + "--namespace", + &namespace, + ]) + .spawn() + .context("sawn cmd")?; + + let stdin = child.stdin.as_mut().context("failed to open stdin")?; + stdin + .write_all(&data) + .await + .context("failed to write to stdin")?; + let output = child + .wait_with_output() + .await + .context("failed to read stdout")?; + info!(sl!(), "get output: {:?}", output); + Ok(()) } impl ServiceManager { - pub async fn new(id: &str, task_server_fd: RawFd) -> Result { + pub async fn new( + id: &str, + containerd_binary: &str, + address: &str, + namespace: &str, + task_server_fd: RawFd, + ) -> Result { let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); let handler = Arc::new( RuntimeHandlerManager::new(id, sender) @@ -41,13 +102,17 @@ impl ServiceManager { receiver: Some(receiver), handler, task_server: Some(task_server), + binary: containerd_binary.to_string(), + address: address.to_string(), + namespace: namespace.to_string(), }) } pub async fn run(&mut self) -> Result<()> { info!(sl!(), "begin to run service"); - self.start().await.context("start")?; + + info!(sl!(), "wait server message"); let mut rx = self.receiver.take(); if let Some(rx) = rx.as_mut() { while let Some(r) = rx.recv().await { @@ -59,9 +124,24 @@ impl ServiceManager { self.stop_listen().await.context("stop listen")?; break; } + Action::Event(event) => { + info!(sl!(), "get event {:?}", &event); + send_event( + self.binary.clone(), + self.address.clone(), + self.namespace.clone(), + event, + ) + .await + .context("send event")?; + Ok(()) + } }; if let Some(ref sender) = r.resp_sender { + if let Err(err) = result.as_ref() { + error!(sl!(), "failed to process action {:?}", err); + } sender.send(result).await.context("send response")?; } } @@ -72,8 +152,18 @@ impl ServiceManager { Ok(()) } - pub fn cleanup(id: &str) -> Result<()> { - RuntimeHandlerManager::cleanup(id) + pub fn cleanup(sid: &str) -> Result<()> { + let temp_dir = [KATA_PATH, sid].join("/"); + if std::fs::metadata(temp_dir.as_str()).is_ok() { + // try to remove dir and skip the result + fs::remove_dir_all(temp_dir) + .map_err(|err| { + warn!(sl!(), "failed to clean up sandbox tmp dir"); + err + }) + .ok(); + } + Ok(()) } async fn start(&mut self) -> Result<()> { diff --git a/src/runtime-rs/crates/shim/src/shim.rs b/src/runtime-rs/crates/shim/src/shim.rs index 298f6a3e00..83060f2b22 100644 --- a/src/runtime-rs/crates/shim/src/shim.rs +++ b/src/runtime-rs/crates/shim/src/shim.rs @@ -60,10 +60,6 @@ impl ShimExecutor { data.parse::().context(Error::ParsePid) } - pub(crate) fn get_bundle_path(&self) -> Result { - std::env::current_dir().context(Error::GetBundlePath) - } - pub(crate) fn socket_address(&self, id: &str) -> Result { if id.is_empty() { return Err(anyhow!(Error::EmptySandboxId)); @@ -72,8 +68,6 @@ impl ShimExecutor { let data = [&self.args.address, &self.args.namespace, id].join("/"); let mut hasher = sha2::Sha256::new(); hasher.update(data); - - // Follow // https://github.com/containerd/containerd/blob/main/runtime/v2/shim/util_unix.go#L68 to // generate a shim socket path. Ok(PathBuf::from(format!( @@ -89,6 +83,8 @@ mod tests { use super::*; use serial_test::serial; + use kata_sys_util::spec::get_bundle_path; + #[test] #[serial] fn test_shim_executor() { @@ -111,7 +107,7 @@ mod tests { executor .write_address(bundle_path, Path::new("12345")) .unwrap(); - let dir = executor.get_bundle_path().unwrap(); + let dir = get_bundle_path().unwrap(); let file_path = &dir.join("address"); let buf = std::fs::read_to_string(file_path).unwrap(); assert_eq!(&buf, "12345"); diff --git a/src/runtime-rs/crates/shim/src/shim_delete.rs b/src/runtime-rs/crates/shim/src/shim_delete.rs index 57082d2129..fd90775662 100644 --- a/src/runtime-rs/crates/shim/src/shim_delete.rs +++ b/src/runtime-rs/crates/shim/src/shim_delete.rs @@ -7,6 +7,7 @@ use anyhow::{Context, Result}; use containerd_shim_protos::api; use protobuf::Message; +use std::{fs, path::Path}; use crate::{shim::ShimExecutor, Error}; @@ -30,6 +31,16 @@ impl ShimExecutor { exited_time.set_seconds(seconds); rsp.set_exited_at(exited_time); + let address = self + .socket_address(&self.args.id) + .context("socket address")?; + let trim_path = address.strip_prefix("unix://").context("trim path")?; + let file_path = Path::new("/").join(trim_path); + let file_path = file_path.as_path(); + if std::fs::metadata(&file_path).is_ok() { + info!(sl!(), "remote socket path: {:?}", &file_path); + fs::remove_file(file_path).ok(); + } service::ServiceManager::cleanup(&self.args.id).context("cleanup")?; Ok(rsp) } diff --git a/src/runtime-rs/crates/shim/src/shim_run.rs b/src/runtime-rs/crates/shim/src/shim_run.rs index 112b835179..cde365780e 100644 --- a/src/runtime-rs/crates/shim/src/shim_run.rs +++ b/src/runtime-rs/crates/shim/src/shim_run.rs @@ -7,6 +7,7 @@ use std::os::unix::io::RawFd; use anyhow::{Context, Result}; +use kata_sys_util::spec::get_bundle_path; use crate::{ logger, @@ -18,7 +19,7 @@ impl ShimExecutor { pub async fn run(&mut self) -> Result<()> { crate::panic_hook::set_panic_hook(); let sid = self.args.id.clone(); - let bundle_path = self.get_bundle_path().context("get bundle")?; + let bundle_path = get_bundle_path().context("get bundle")?; let path = bundle_path.join("log"); let _logger_guard = logger::set_logger(path.to_str().unwrap(), &sid, self.args.debug).context("set logger"); @@ -36,12 +37,18 @@ impl ShimExecutor { async fn do_run(&mut self) -> Result<()> { info!(sl!(), "start to run"); - self.args.validate(false).context("validata")?; + self.args.validate(false).context("validate")?; let server_fd = get_server_fd().context("get server fd")?; - let mut service_manager = service::ServiceManager::new(&self.args.id, server_fd) - .await - .context("new runtime server")?; + let mut service_manager = service::ServiceManager::new( + &self.args.id, + &self.args.publish_binary, + &self.args.address, + &self.args.namespace, + server_fd, + ) + .await + .context("new shim server")?; service_manager.run().await.context("run")?; Ok(()) diff --git a/src/runtime-rs/crates/shim/src/shim_start.rs b/src/runtime-rs/crates/shim/src/shim_start.rs index a840532091..414e0ccf3f 100644 --- a/src/runtime-rs/crates/shim/src/shim_start.rs +++ b/src/runtime-rs/crates/shim/src/shim_start.rs @@ -12,6 +12,7 @@ use std::{ }; use anyhow::{anyhow, Context, Result}; +use kata_sys_util::spec::get_bundle_path; use kata_types::{container::ContainerType, k8s}; use unix_socket::UnixListener; @@ -32,7 +33,7 @@ impl ShimExecutor { } fn do_start(&mut self) -> Result { - let bundle_path = self.get_bundle_path().context("get bundle path")?; + let bundle_path = get_bundle_path().context("get bundle path")?; let spec = self.load_oci_spec(&bundle_path)?; let (container_type, id) = k8s::container_type_with_id(&spec); @@ -66,7 +67,7 @@ impl ShimExecutor { return Err(anyhow!("invalid param")); } - let bundle_path = self.get_bundle_path().context("get bundle path")?; + let bundle_path = get_bundle_path().context("get bundle path")?; let self_exec = std::env::current_exe().map_err(Error::SelfExec)?; let mut command = std::process::Command::new(self_exec); @@ -109,7 +110,7 @@ impl ShimExecutor { fn get_shim_info_from_sandbox(&self, sandbox_id: &str) -> Result<(PathBuf, u32)> { // All containers of a pod share the same pod socket address. let address = self.socket_address(sandbox_id).context("socket address")?; - let bundle_path = self.get_bundle_path().context("get bundle path")?; + let bundle_path = get_bundle_path().context("get bundle path")?; let parent_bundle_path = Path::new(&bundle_path) .parent() .unwrap_or_else(|| Path::new("")); @@ -165,19 +166,13 @@ mod tests { let cmd = executor.new_command().unwrap(); assert_eq!(cmd.get_args().len(), 8); assert_eq!(cmd.get_envs().len(), 1); - assert_eq!( - cmd.get_current_dir().unwrap(), - executor.get_bundle_path().unwrap() - ); + assert_eq!(cmd.get_current_dir().unwrap(), get_bundle_path().unwrap()); executor.args.debug = true; let cmd = executor.new_command().unwrap(); assert_eq!(cmd.get_args().len(), 9); assert_eq!(cmd.get_envs().len(), 1); - assert_eq!( - cmd.get_current_dir().unwrap(), - executor.get_bundle_path().unwrap() - ); + assert_eq!(cmd.get_current_dir().unwrap(), get_bundle_path().unwrap()); } #[test]