runtime-rs: enhance runtimes

1. support oom event
2. use ContainerProcess to store container_id and exec_id
3. support stats

Fixes: #3785
Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com>
This commit is contained in:
Zhongtao Hu 2022-03-26 17:33:41 +08:00 committed by Fupan Li
parent 9887272db9
commit 10343b1f3d
23 changed files with 674 additions and 175 deletions

View File

@ -26,6 +26,7 @@ dependencies = [
"futures 0.1.31",
"kata-types",
"log",
"logging",
"oci",
"protobuf",
"protocols",
@ -61,9 +62,9 @@ checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
[[package]]
name = "async-trait"
version = "0.1.52"
version = "0.1.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
dependencies = [
"proc-macro2",
"quote",
@ -93,9 +94,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "1.2.1"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
@ -170,7 +171,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"time 0.1.43",
"time 0.1.44",
"winapi",
]
@ -178,6 +179,7 @@ dependencies = [
name = "common"
version = "0.1.0"
dependencies = [
"agent",
"anyhow",
"async-trait",
"containerd-shim-protos",
@ -434,13 +436,24 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.5"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
@ -463,9 +476,9 @@ checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
[[package]]
name = "git2"
version = "0.14.2"
version = "0.13.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3826a6e0e2215d7a41c2bfc7c9244123969273f3476b939a226aac0ab56e9e3c"
checksum = "f29229cc1b24c0e6062f6e742aa3e256492a5323365e5ed3413599f8a5eff7d6"
dependencies = [
"bitflags",
"libc",
@ -547,9 +560,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "1.8.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee"
dependencies = [
"autocfg",
"hashbrown",
@ -601,6 +614,7 @@ dependencies = [
name = "kata-sys-util"
version = "0.1.0"
dependencies = [
"byteorder",
"cgroups-rs",
"chrono",
"common-path",
@ -611,6 +625,7 @@ dependencies = [
"nix 0.23.1",
"oci",
"once_cell",
"rand 0.7.3",
"serde_json",
"slog",
"slog-scope",
@ -643,15 +658,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.121"
version = "0.2.122"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259"
[[package]]
name = "libgit2-sys"
version = "0.13.2+1.4.2"
version = "0.12.26+1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a42de9a51a5c12e00fc0e4ca6bc2ea43582fc6418488e8f615e905d886f258b"
checksum = "19e1c899248e606fbfe68dcb31d8b0176ebab833b103824af31bddf4b7457494"
dependencies = [
"cc",
"libc",
@ -683,18 +698,19 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.6"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [
"cfg-if 1.0.0",
]
@ -929,9 +945,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe"
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"
[[package]]
name = "ppv-lite86"
@ -965,9 +981,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
dependencies = [
"unicode-xid",
]
@ -1065,9 +1081,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.16"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57"
checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1"
dependencies = [
"proc-macro2",
]
@ -1095,6 +1111,19 @@ dependencies = [
"winapi",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom 0.1.16",
"libc",
"rand_chacha 0.2.2",
"rand_core 0.5.1",
"rand_hc",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -1102,10 +1131,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_chacha 0.3.1",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core 0.5.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@ -1131,13 +1170,31 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom 0.1.16",
]
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
"getrandom 0.2.6",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core 0.5.1",
]
[[package]]
@ -1151,9 +1208,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.2.11"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42"
dependencies = [
"bitflags",
]
@ -1260,9 +1317,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "semver"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d"
checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4"
[[package]]
name = "serde"
@ -1389,9 +1446,9 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
[[package]]
name = "slog"
@ -1413,14 +1470,14 @@ dependencies = [
[[package]]
name = "slog-json"
version = "2.6.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70f825ce7346f40aa318111df5d3a94945a7fdca9081584cb9b05692fb3dfcb4"
checksum = "3e1e53f61af1e3c8b852eef0a9dee29008f55d6dd63794f3f12cef786cf0f219"
dependencies = [
"serde",
"serde_json",
"slog",
"time 0.3.7",
"time 0.3.9",
]
[[package]]
@ -1436,9 +1493,9 @@ dependencies = [
[[package]]
name = "slog-stdlog"
version = "4.1.0"
version = "4.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8228ab7302adbf4fcb37e66f3cda78003feb521e7fd9e3847ec117a7784d0f5a"
checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e"
dependencies = [
"log",
"slog",
@ -1495,9 +1552,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.89"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54"
checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
dependencies = [
"proc-macro2",
"quote",
@ -1562,19 +1619,20 @@ dependencies = [
[[package]]
name = "time"
version = "0.1.43"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "time"
version = "0.3.7"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa",
"libc",
@ -1815,9 +1873,15 @@ dependencies = [
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"

View File

@ -21,6 +21,7 @@ tokio = { version = "1.8.0", features = ["fs", "rt"] }
url = "2.2.2"
kata-types = { path = "../../../libs/kata-types"}
logging = { path = "../../../libs/logging"}
oci = { path = "../../../libs/oci" }
protocols = { path = "../../../libs/protocols", features=["async"] }

View File

@ -20,14 +20,11 @@ fn new_ttrpc_ctx(timeout: i64) -> ttrpc_ctx::Context {
#[async_trait]
impl AgentManager for KataAgent {
async fn set_socket_address(&self, address: &str) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.socket_address = address.to_string();
Ok(())
}
async fn start(&self) -> Result<()> {
info!(sl!(), "begin to connect agent");
async fn start(&self, address: &str) -> Result<()> {
info!(sl!(), "begin to connect agent {:?}", address);
self.set_socket_address(address)
.await
.context("set socket")?;
self.connect_agent_server()
.await
.context("connect agent server")?;

View File

@ -82,6 +82,12 @@ impl KataAgent {
})
}
pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.socket_address = address.to_string();
Ok(())
}
pub(crate) async fn connect_agent_server(&self) -> Result<()> {
let mut inner = self.inner.lock().await;

View File

@ -254,8 +254,8 @@ impl From<agent::Routes> for Routes {
impl From<CreateContainerRequest> for agent::CreateContainerRequest {
fn from(from: CreateContainerRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
string_user: from_option(from.string_user),
devices: from_vec(from.devices),
storages: from_vec(from.storages),
@ -321,8 +321,8 @@ impl From<ContainerID> for agent::ResumeContainerRequest {
impl From<SignalProcessRequest> for agent::SignalProcessRequest {
fn from(from: SignalProcessRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
signal: from.signal,
unknown_fields: Default::default(),
cached_size: Default::default(),
@ -333,8 +333,8 @@ impl From<SignalProcessRequest> for agent::SignalProcessRequest {
impl From<WaitProcessRequest> for agent::WaitProcessRequest {
fn from(from: WaitProcessRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
@ -355,8 +355,8 @@ impl From<UpdateContainerRequest> for agent::UpdateContainerRequest {
impl From<WriteStreamRequest> for agent::WriteStreamRequest {
fn from(from: WriteStreamRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
data: from.data,
unknown_fields: Default::default(),
cached_size: Default::default(),
@ -373,8 +373,8 @@ impl From<agent::WriteStreamResponse> for WriteStreamResponse {
impl From<ExecProcessRequest> for agent::ExecProcessRequest {
fn from(from: ExecProcessRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
string_user: from_option(from.string_user),
process: from_option(from.process),
unknown_fields: Default::default(),
@ -523,8 +523,8 @@ impl From<agent::StatsContainerResponse> for StatsContainerResponse {
impl From<ReadStreamRequest> for agent::ReadStreamRequest {
fn from(from: ReadStreamRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
len: from.len,
unknown_fields: Default::default(),
cached_size: Default::default(),
@ -541,8 +541,8 @@ impl From<agent::ReadStreamResponse> for ReadStreamResponse {
impl From<CloseStdinRequest> for agent::CloseStdinRequest {
fn from(from: CloseStdinRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
@ -552,8 +552,8 @@ impl From<CloseStdinRequest> for agent::CloseStdinRequest {
impl From<TtyWinResizeRequest> for agent::TtyWinResizeRequest {
fn from(from: TtyWinResizeRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
container_id: from.process_id.container_id(),
exec_id: from.process_id.exec_id(),
row: from.row,
column: from.column,
unknown_fields: Default::default(),

View File

@ -7,11 +7,7 @@
#[macro_use]
extern crate slog;
macro_rules! sl {
() => {
slog_scope::logger().new(slog::o!("subsystem" => "agent"))
};
}
logging::logger_with_subsystem!(sl, "agent");
pub mod kata;
mod log_forwarder;
@ -19,14 +15,15 @@ mod sock;
mod types;
pub use types::{
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, BlkioStatsEntry, CheckRequest,
CloseStdinRequest, ContainerID, CopyFileRequest, CreateContainerRequest, CreateSandboxRequest,
Empty, ExecProcessRequest, GetGuestDetailsRequest, GuestDetailsResponse, HealthCheckResponse,
IPAddress, IPFamily, Interface, Interfaces, ListProcessesRequest, MemHotplugByProbeRequest,
OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest, ReadStreamResponse,
RemoveContainerRequest, ReseedRandomDevRequest, Route, Routes, SetGuestDateTimeRequest,
SignalProcessRequest, StatsContainerResponse, Storage, TtyWinResizeRequest,
UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest, VersionCheckResponse,
WaitProcessRequest, WaitProcessResponse, WriteStreamRequest, WriteStreamResponse,
CloseStdinRequest, ContainerID, ContainerProcessID, CopyFileRequest, CreateContainerRequest,
CreateSandboxRequest, Empty, ExecProcessRequest, GetGuestDetailsRequest, GuestDetailsResponse,
HealthCheckResponse, IPAddress, IPFamily, Interface, Interfaces, ListProcessesRequest,
MemHotplugByProbeRequest, OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest,
ReadStreamResponse, RemoveContainerRequest, ReseedRandomDevRequest, Route, Routes,
SetGuestDateTimeRequest, SignalProcessRequest, StatsContainerResponse, Storage,
TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest,
VersionCheckResponse, WaitProcessRequest, WaitProcessResponse, WriteStreamRequest,
WriteStreamResponse,
};
use anyhow::Result;
@ -34,8 +31,7 @@ use async_trait::async_trait;
#[async_trait]
pub trait AgentManager: Send + Sync {
async fn set_socket_address(&self, address: &str) -> Result<()>;
async fn start(&self) -> Result<()>;
async fn start(&self, address: &str) -> Result<()>;
async fn stop(&self);
}
@ -57,6 +53,7 @@ pub trait Agent: AgentManager + HealthService + Send + Sync {
async fn list_routes(&self, req: Empty) -> Result<Routes>;
async fn update_interface(&self, req: UpdateInterfaceRequest) -> Result<Interface>;
async fn update_routes(&self, req: UpdateRoutesRequest) -> Result<Routes>;
// container
async fn create_container(&self, req: CreateContainerRequest) -> Result<Empty>;
async fn pause_container(&self, req: ContainerID) -> Result<Empty>;

View File

@ -56,7 +56,7 @@ impl Sock for HybridVsock {
}
}
}
Err(anyhow!("cannot connect to agent ttrpc server"))
Err(anyhow!("cannot connect to agent ttrpc server {:?}", config))
}
}

View File

@ -79,6 +79,7 @@ impl AsyncRead for Stream {
}
/// Connect config
#[derive(Debug)]
pub struct ConnectConfig {
dial_timeout_ms: u64,
reconnect_timeout_ms: u64,

View File

@ -97,8 +97,7 @@ pub struct Routes {
#[derive(PartialEq, Clone, Default)]
pub struct CreateContainerRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
pub string_user: Option<StringUser>,
pub devices: Vec<Device>,
pub storages: Vec<Storage>,
@ -121,6 +120,29 @@ impl ContainerID {
}
}
#[derive(PartialEq, Clone, Default)]
pub struct ContainerProcessID {
pub container_id: ContainerID,
pub exec_id: String,
}
impl ContainerProcessID {
pub fn new(container_id: &str, exec_id: &str) -> Self {
Self {
container_id: ContainerID::new(container_id),
exec_id: exec_id.to_string(),
}
}
pub fn container_id(&self) -> String {
self.container_id.container_id.clone()
}
pub fn exec_id(&self) -> String {
self.exec_id.clone()
}
}
#[derive(PartialEq, Clone, Debug, Default)]
pub struct RemoveContainerRequest {
pub container_id: String,
@ -138,15 +160,13 @@ impl RemoveContainerRequest {
#[derive(PartialEq, Clone, Default)]
pub struct SignalProcessRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
pub signal: u32,
}
#[derive(PartialEq, Clone, Default)]
pub struct WaitProcessRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
}
#[derive(PartialEq, Clone, Default)]
@ -165,8 +185,7 @@ pub struct UpdateContainerRequest {
#[derive(PartialEq, Clone, Default)]
pub struct WriteStreamRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
pub data: Vec<u8>,
}
@ -177,8 +196,7 @@ pub struct WriteStreamResponse {
#[derive(PartialEq, Clone, Default)]
pub struct ExecProcessRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
pub string_user: Option<StringUser>,
pub process: Option<oci::Process>,
}
@ -297,8 +315,7 @@ pub struct WaitProcessResponse {
#[derive(PartialEq, Clone, Default)]
pub struct ReadStreamRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
pub len: u32,
}
@ -309,14 +326,12 @@ pub struct ReadStreamResponse {
#[derive(PartialEq, Clone, Default)]
pub struct CloseStdinRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
}
#[derive(PartialEq, Clone, Default)]
pub struct TtyWinResizeRequest {
pub container_id: String,
pub exec_id: String,
pub process_id: ContainerProcessID,
pub row: u32,
pub column: u32,
}

View File

@ -21,6 +21,7 @@ thiserror = "^1.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread", "process", "fs"] }
ttrpc = { version = "0.6.0" }
agent = { path = "../../agent" }
kata-sys-util = { path = "../../../../libs/kata-sys-util" }
kata-types = { path = "../../../../libs/kata-types" }
oci = { path = "../../../../libs/oci" }

View File

@ -16,7 +16,7 @@ use crate::types::{
#[async_trait]
pub trait ContainerManager: Send + Sync {
// container lifecycle
async fn create_container(&self, config: ContainerConfig) -> Result<PID>;
async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID>;
async fn pause_container(&self, container_id: &ContainerID) -> Result<()>;
async fn resume_container(&self, container_id: &ContainerID) -> Result<()>;
async fn stats_container(&self, container_id: &ContainerID) -> Result<StatsInfo>;

View File

@ -3,8 +3,10 @@
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::Result;
use anyhow::{Context, Result};
use containerd_shim_protos::{events::task::TaskOOM, protobuf::Message as ProtobufMessage};
use tokio::sync::mpsc::{channel, Receiver, Sender};
/// message receiver buffer size
@ -15,8 +17,12 @@ pub enum Action {
Start,
Stop,
Shutdown,
Event(Arc<dyn Event>),
}
unsafe impl Send for Message {}
unsafe impl Sync for Message {}
#[derive(Debug)]
pub struct Message {
pub action: Action,
@ -42,3 +48,25 @@ impl Message {
)
}
}
const TASK_OOM_EVENT_TOPIC: &str = "/tasks/oom";
pub trait Event: std::fmt::Debug + Send {
fn r#type(&self) -> String;
fn type_url(&self) -> String;
fn value(&self) -> Result<Vec<u8>>;
}
impl Event for TaskOOM {
fn r#type(&self) -> String {
TASK_OOM_EVENT_TOPIC.to_string()
}
fn type_url(&self) -> String {
"containerd.events.TaskOOM".to_string()
}
fn value(&self) -> Result<Vec<u8>> {
self.write_to_bytes().context("get oom value")
}
}

View File

@ -7,9 +7,11 @@
use anyhow::Result;
use async_trait::async_trait;
use kata_types::config::TomlConfig;
#[async_trait]
pub trait Sandbox: Send + Sync {
async fn start(&self) -> Result<()>;
async fn start(&self, netns: Option<String>, config: &TomlConfig) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn cleanup(&self, container_id: &str) -> Result<()>;
async fn shutdown(&self) -> Result<()>;

View File

@ -4,7 +4,9 @@
// SPDX-License-Identifier: Apache-2.0
//
mod trans_from_agent;
mod trans_from_shim;
mod trans_into_agent;
mod trans_into_shim;
use std::fmt;
@ -69,6 +71,12 @@ pub struct ContainerID {
pub container_id: String,
}
impl ToString for ContainerID {
fn to_string(&self) -> String {
self.container_id.clone()
}
}
impl ContainerID {
pub fn new(container_id: &str) -> Result<Self> {
validate::verify_id(container_id).context("verify container id")?;
@ -105,6 +113,14 @@ impl ContainerProcess {
process_type,
})
}
pub fn container_id(&self) -> &str {
&self.container_id.container_id
}
pub fn exec_id(&self) -> &str {
&self.exec_id
}
}
#[derive(Debug, Clone)]
pub struct ContainerConfig {
@ -130,7 +146,7 @@ impl PID {
#[derive(Debug, Clone)]
pub struct KillRequest {
pub process_id: ContainerProcess,
pub process: ContainerProcess,
pub signal: u32,
pub all: bool,
}
@ -143,14 +159,14 @@ pub struct ShutdownRequest {
#[derive(Debug, Clone)]
pub struct ResizePTYRequest {
pub process_id: ContainerProcess,
pub process: ContainerProcess,
pub width: u32,
pub height: u32,
}
#[derive(Debug, Clone)]
pub struct ExecProcessRequest {
pub process_id: ContainerProcess,
pub process: ContainerProcess,
pub terminal: bool,
pub stdin: Option<String>,
pub stdout: Option<String>,

View File

@ -0,0 +1,214 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::convert::From;
use containerd_shim_protos::cgroups::metrics;
use protobuf::Message;
use super::{StatsInfo, StatsInfoValue};
// TODO: trans from agent proto?
impl From<Option<agent::StatsContainerResponse>> for StatsInfo {
fn from(c_stats: Option<agent::StatsContainerResponse>) -> Self {
let mut metric = metrics::Metrics::new();
let stats = match c_stats {
None => {
return StatsInfo { value: None };
}
Some(stats) => stats,
};
if let Some(cg_stats) = stats.cgroup_stats {
if let Some(cpu) = cg_stats.cpu_stats {
// set protobuf cpu stat
let mut p_cpu = metrics::CPUStat::new();
if let Some(usage) = cpu.cpu_usage {
let mut p_usage = metrics::CPUUsage::new();
p_usage.set_total(usage.total_usage);
p_usage.set_per_cpu(usage.percpu_usage);
p_usage.set_kernel(usage.usage_in_kernelmode);
p_usage.set_user(usage.usage_in_usermode);
// set protobuf cpu usage
p_cpu.set_usage(p_usage);
}
if let Some(throttle) = cpu.throttling_data {
let mut p_throttle = metrics::Throttle::new();
p_throttle.set_periods(throttle.periods);
p_throttle.set_throttled_time(throttle.throttled_time);
p_throttle.set_throttled_periods(throttle.throttled_periods);
// set protobuf cpu usage
p_cpu.set_throttling(p_throttle);
}
metric.set_cpu(p_cpu);
}
if let Some(m_stats) = cg_stats.memory_stats {
let mut p_m = metrics::MemoryStat::new();
p_m.set_cache(m_stats.cache);
// memory usage
if let Some(m_data) = m_stats.usage {
let mut p_m_entry = metrics::MemoryEntry::new();
p_m_entry.set_usage(m_data.usage);
p_m_entry.set_limit(m_data.limit);
p_m_entry.set_failcnt(m_data.failcnt);
p_m_entry.set_max(m_data.max_usage);
p_m.set_usage(p_m_entry);
}
// memory swap_usage
if let Some(m_data) = m_stats.swap_usage {
let mut p_m_entry = metrics::MemoryEntry::new();
p_m_entry.set_usage(m_data.usage);
p_m_entry.set_limit(m_data.limit);
p_m_entry.set_failcnt(m_data.failcnt);
p_m_entry.set_max(m_data.max_usage);
p_m.set_swap(p_m_entry);
}
// memory kernel_usage
if let Some(m_data) = m_stats.kernel_usage {
let mut p_m_entry = metrics::MemoryEntry::new();
p_m_entry.set_usage(m_data.usage);
p_m_entry.set_limit(m_data.limit);
p_m_entry.set_failcnt(m_data.failcnt);
p_m_entry.set_max(m_data.max_usage);
p_m.set_kernel(p_m_entry);
}
for (k, v) in m_stats.stats {
match k.as_str() {
"dirty" => p_m.set_dirty(v),
"rss" => p_m.set_rss(v),
"rss_huge" => p_m.set_rss_huge(v),
"mapped_file" => p_m.set_mapped_file(v),
"writeback" => p_m.set_writeback(v),
"pg_pg_in" => p_m.set_pg_pg_in(v),
"pg_pg_out" => p_m.set_pg_pg_out(v),
"pg_fault" => p_m.set_pg_fault(v),
"pg_maj_fault" => p_m.set_pg_maj_fault(v),
"inactive_file" => p_m.set_inactive_file(v),
"inactive_anon" => p_m.set_inactive_anon(v),
"active_file" => p_m.set_active_file(v),
"unevictable" => p_m.set_unevictable(v),
"hierarchical_memory_limit" => p_m.set_hierarchical_memory_limit(v),
"hierarchical_swap_limit" => p_m.set_hierarchical_swap_limit(v),
"total_cache" => p_m.set_total_cache(v),
"total_rss" => p_m.set_total_rss(v),
"total_mapped_file" => p_m.set_total_mapped_file(v),
"total_dirty" => p_m.set_total_dirty(v),
"total_pg_pg_in" => p_m.set_total_pg_pg_in(v),
"total_pg_pg_out" => p_m.set_total_pg_pg_out(v),
"total_pg_fault" => p_m.set_total_pg_fault(v),
"total_pg_maj_fault" => p_m.set_total_pg_maj_fault(v),
"total_inactive_file" => p_m.set_total_inactive_file(v),
"total_inactive_anon" => p_m.set_total_inactive_anon(v),
"total_active_file" => p_m.set_total_active_file(v),
"total_unevictable" => p_m.set_total_unevictable(v),
_ => (),
}
}
metric.set_memory(p_m);
}
if let Some(pid_stats) = cg_stats.pids_stats {
let mut p_pid = metrics::PidsStat::new();
p_pid.set_limit(pid_stats.limit);
p_pid.set_current(pid_stats.current);
metric.set_pids(p_pid);
}
if let Some(blk_stats) = cg_stats.blkio_stats {
let mut p_blk_stats = metrics::BlkIOStat::new();
p_blk_stats
.set_io_serviced_recursive(copy_blkio_entry(&blk_stats.io_serviced_recursive));
p_blk_stats.set_io_service_bytes_recursive(copy_blkio_entry(
&blk_stats.io_service_bytes_recursive,
));
p_blk_stats
.set_io_queued_recursive(copy_blkio_entry(&blk_stats.io_queued_recursive));
p_blk_stats.set_io_service_time_recursive(copy_blkio_entry(
&blk_stats.io_service_time_recursive,
));
p_blk_stats.set_io_wait_time_recursive(copy_blkio_entry(
&blk_stats.io_wait_time_recursive,
));
p_blk_stats
.set_io_merged_recursive(copy_blkio_entry(&blk_stats.io_merged_recursive));
p_blk_stats.set_io_time_recursive(copy_blkio_entry(&blk_stats.io_time_recursive));
p_blk_stats.set_sectors_recursive(copy_blkio_entry(&blk_stats.sectors_recursive));
metric.set_blkio(p_blk_stats);
}
if !cg_stats.hugetlb_stats.is_empty() {
let mut p_huge = ::protobuf::RepeatedField::new();
for (k, v) in cg_stats.hugetlb_stats {
let mut h = metrics::HugetlbStat::new();
h.set_pagesize(k);
h.set_max(v.max_usage);
h.set_usage(v.usage);
h.set_failcnt(v.failcnt);
p_huge.push(h);
}
metric.set_hugetlb(p_huge);
}
}
let net_stats = stats.network_stats;
if !net_stats.is_empty() {
let mut p_net = ::protobuf::RepeatedField::new();
for v in net_stats.iter() {
let mut h = metrics::NetworkStat::new();
h.set_name(v.name.clone());
h.set_tx_bytes(v.tx_bytes);
h.set_tx_packets(v.tx_packets);
h.set_tx_errors(v.tx_errors);
h.set_tx_dropped(v.tx_dropped);
h.set_rx_bytes(v.rx_bytes);
h.set_rx_packets(v.rx_packets);
h.set_rx_errors(v.rx_errors);
h.set_rx_dropped(v.rx_dropped);
p_net.push(h);
}
metric.set_network(p_net);
}
StatsInfo {
value: Some(StatsInfoValue {
type_url: "io.containerd.cgroups.v1.Metrics".to_string(),
value: metric.write_to_bytes().unwrap(),
}),
}
}
}
fn copy_blkio_entry(
entry: &[agent::BlkioStatsEntry],
) -> ::protobuf::RepeatedField<metrics::BlkIOEntry> {
let mut p_entry = ::protobuf::RepeatedField::new();
for e in entry.iter() {
let mut blk = metrics::BlkIOEntry::new();
blk.set_op(e.op.clone());
blk.set_value(e.value);
blk.set_major(e.major);
blk.set_minor(e.minor);
p_entry.push(blk);
}
p_entry
}

View File

@ -82,7 +82,7 @@ impl TryFrom<api::ExecProcessRequest> for Request {
fn try_from(from: api::ExecProcessRequest) -> Result<Self> {
let spec = from.get_spec();
Ok(Request::ExecProcess(ExecProcessRequest {
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
terminal: from.terminal,
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
stdout: (!from.stdout.is_empty()).then(|| from.stdout.clone()),
@ -97,7 +97,7 @@ impl TryFrom<api::KillRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::KillRequest) -> Result<Self> {
Ok(Request::KillProcess(KillRequest {
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
signal: from.signal,
all: from.all,
}))
@ -145,7 +145,7 @@ impl TryFrom<api::ResizePtyRequest> for Request {
type Error = anyhow::Error;
fn try_from(from: api::ResizePtyRequest) -> Result<Self> {
Ok(Request::ResizeProcessPTY(ResizePTYRequest {
process_id: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
width: from.width,
height: from.height,
}))

View File

@ -0,0 +1,28 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::convert::From;
use agent;
use super::{ContainerID, ContainerProcess};
impl From<ContainerID> for agent::ContainerID {
fn from(from: ContainerID) -> Self {
Self {
container_id: from.container_id,
}
}
}
impl From<ContainerProcess> for agent::ContainerProcessID {
fn from(from: ContainerProcess) -> Self {
Self {
container_id: from.container_id.into(),
exec_id: from.exec_id,
}
}
}

View File

@ -25,7 +25,7 @@ use wasm_container::WasmContainer;
struct RuntimeHandlerManagerInner {
id: String,
msg_sender: Sender<Message>,
runtime_instance: Option<RuntimeInstance>,
runtime_instance: Option<Arc<RuntimeInstance>>,
}
impl RuntimeHandlerManagerInner {
@ -37,26 +37,22 @@ impl RuntimeHandlerManagerInner {
})
}
async fn init_runtime_handler(&mut self, runtime_name: &str) -> Result<()> {
info!(sl!(), "new runtime handler {}", runtime_name);
let runtime_handler = match runtime_name {
async fn init_runtime_handler(
&mut self,
netns: Option<String>,
config: &TomlConfig,
) -> Result<()> {
info!(sl!(), "new runtime handler {}", &config.runtime.name);
let runtime_handler = match config.runtime.name.as_str() {
#[cfg(feature = "linux")]
name if name == LinuxContainer::name() => {
LinuxContainer::init().context("init linux container")?;
LinuxContainer::new_handler()
}
name if name == LinuxContainer::name() => LinuxContainer::new_handler(),
#[cfg(feature = "wasm")]
name if name == WasmContainer::name() => {
WasmContainer::init().context("init wasm container")?;
WasmContainer::new_handler()
}
name if name == WasmContainer::name() => WasmContainer::new_handler(),
#[cfg(feature = "virt")]
name if name == VirtContainer::name() => {
VirtContainer::init().context("init virt container")?;
name if name == VirtContainer::name() || name.is_empty() => {
VirtContainer::new_handler()
}
_ => return Err(anyhow!("Unsupported runtime: {}", runtime_name)),
_ => return Err(anyhow!("Unsupported runtime: {}", &config.runtime.name)),
};
let runtime_instance = runtime_handler
.new_instance(&self.id, self.msg_sender.clone())
@ -66,10 +62,10 @@ impl RuntimeHandlerManagerInner {
// start sandbox
runtime_instance
.sandbox
.start()
.start(netns, config)
.await
.context("start sandbox")?;
self.runtime_instance = Some(runtime_instance);
self.runtime_instance = Some(Arc::new(runtime_instance));
Ok(())
}
@ -79,15 +75,39 @@ impl RuntimeHandlerManagerInner {
return Ok(());
}
#[cfg(feature = "linux")]
LinuxContainer::init().context("init linux container")?;
#[cfg(feature = "wasm")]
WasmContainer::init().context("init wasm container")?;
#[cfg(feature = "virt")]
VirtContainer::init().context("init virt container")?;
let netns = if let Some(linux) = &spec.linux {
let mut netns = None;
for ns in &linux.namespaces {
if ns.r#type.as_str() != oci::NETWORKNAMESPACE {
continue;
}
if !ns.path.is_empty() {
netns = Some(ns.path.clone());
break;
}
}
netns
} else {
None
};
let config = load_config(spec).context("load config")?;
self.init_runtime_handler(&config.runtime.name)
self.init_runtime_handler(netns, &config)
.await
.context("init runtime handler")?;
Ok(())
}
fn get_runtime_instance(&self) -> Option<RuntimeInstance> {
fn get_runtime_instance(&self) -> Option<Arc<RuntimeInstance>> {
self.runtime_instance.clone()
}
}
@ -112,25 +132,35 @@ impl RuntimeHandlerManager {
Ok(())
}
async fn get_runtime_instance(&self) -> Result<Arc<RuntimeInstance>> {
let inner = self.inner.read().await;
inner
.get_runtime_instance()
.ok_or_else(|| anyhow!("runtime not ready"))
}
async fn try_init_runtime_instance(&self, spec: &oci::Spec) -> Result<()> {
let mut inner = self.inner.write().await;
inner.try_init(spec).await
}
pub async fn handler_message(&self, req: Request) -> Result<Response> {
if let Request::CreateContainer(req) = req {
// get oci spec
let bundler_path = format!("{}/{}", req.bundle, oci::OCI_SPEC_CONFIG_FILE_NAME);
let spec = oci::Spec::load(&bundler_path).context("load spec")?;
let mut inner = self.inner.write().await;
inner
.try_init(&spec)
self.try_init_runtime_instance(&spec)
.await
.context("try init runtime handler")?;
let instance = inner
.context("try init runtime instance")?;
let instance = self
.get_runtime_instance()
.ok_or_else(|| anyhow!("runtime not ready"))?;
.await
.context("get runtime instance")?;
let shim_pid = instance
.container_manager
.create_container(req)
.create_container(req, spec)
.await
.context("create container")?;
Ok(Response::CreateContainer(shim_pid))
@ -140,12 +170,12 @@ impl RuntimeHandlerManager {
}
pub async fn handler_request(&self, req: Request) -> Result<Response> {
let inner = self.inner.read().await;
let instance = inner
let instance = self
.get_runtime_instance()
.ok_or_else(|| anyhow!("runtime not ready"))?;
let sandbox = instance.sandbox;
let cm = instance.container_manager;
.await
.context("get runtime instance")?;
let sandbox = instance.sandbox.clone();
let cm = instance.container_manager.clone();
match req {
Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)),

View File

@ -5,15 +5,24 @@
//
use std::{
fs,
os::unix::io::{FromRawFd, RawFd},
process::Stdio,
sync::Arc,
};
use anyhow::{Context, Result};
use common::message::{Action, Message};
use containerd_shim_protos::shim_async;
use common::message::{Action, Event, Message};
use containerd_shim_protos::{
protobuf::{well_known_types::Any, Message as ProtobufMessage},
shim_async,
};
use runtimes::RuntimeHandlerManager;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::{
io::AsyncWriteExt,
process::Command,
sync::mpsc::{channel, Receiver},
};
use ttrpc::asynchronous::Server;
use crate::task_service::TaskService;
@ -21,14 +30,66 @@ use crate::task_service::TaskService;
/// message buffer size
const MESSAGE_BUFFER_SIZE: usize = 8;
pub const KATA_PATH: &str = "/run/kata";
pub struct ServiceManager {
receiver: Option<Receiver<Message>>,
handler: Arc<RuntimeHandlerManager>,
task_server: Option<Server>,
binary: String,
address: String,
namespace: String,
}
async fn send_event(
containerd_binary: String,
address: String,
namespace: String,
event: Arc<dyn Event>,
) -> Result<()> {
let any = Any {
type_url: event.type_url(),
value: event.value().context("get event value")?,
..Default::default()
};
let data = any.write_to_bytes().context("write to any")?;
let mut child = Command::new(containerd_binary)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.args(&[
"--address",
&address,
"publish",
"--topic",
&event.r#type(),
"--namespace",
&namespace,
])
.spawn()
.context("sawn cmd")?;
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
stdin
.write_all(&data)
.await
.context("failed to write to stdin")?;
let output = child
.wait_with_output()
.await
.context("failed to read stdout")?;
info!(sl!(), "get output: {:?}", output);
Ok(())
}
impl ServiceManager {
pub async fn new(id: &str, task_server_fd: RawFd) -> Result<Self> {
pub async fn new(
id: &str,
containerd_binary: &str,
address: &str,
namespace: &str,
task_server_fd: RawFd,
) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = Arc::new(
RuntimeHandlerManager::new(id, sender)
@ -41,13 +102,17 @@ impl ServiceManager {
receiver: Some(receiver),
handler,
task_server: Some(task_server),
binary: containerd_binary.to_string(),
address: address.to_string(),
namespace: namespace.to_string(),
})
}
pub async fn run(&mut self) -> Result<()> {
info!(sl!(), "begin to run service");
self.start().await.context("start")?;
info!(sl!(), "wait server message");
let mut rx = self.receiver.take();
if let Some(rx) = rx.as_mut() {
while let Some(r) = rx.recv().await {
@ -59,9 +124,24 @@ impl ServiceManager {
self.stop_listen().await.context("stop listen")?;
break;
}
Action::Event(event) => {
info!(sl!(), "get event {:?}", &event);
send_event(
self.binary.clone(),
self.address.clone(),
self.namespace.clone(),
event,
)
.await
.context("send event")?;
Ok(())
}
};
if let Some(ref sender) = r.resp_sender {
if let Err(err) = result.as_ref() {
error!(sl!(), "failed to process action {:?}", err);
}
sender.send(result).await.context("send response")?;
}
}
@ -72,8 +152,18 @@ impl ServiceManager {
Ok(())
}
pub fn cleanup(id: &str) -> Result<()> {
RuntimeHandlerManager::cleanup(id)
pub fn cleanup(sid: &str) -> Result<()> {
let temp_dir = [KATA_PATH, sid].join("/");
if std::fs::metadata(temp_dir.as_str()).is_ok() {
// try to remove dir and skip the result
fs::remove_dir_all(temp_dir)
.map_err(|err| {
warn!(sl!(), "failed to clean up sandbox tmp dir");
err
})
.ok();
}
Ok(())
}
async fn start(&mut self) -> Result<()> {

View File

@ -60,10 +60,6 @@ impl ShimExecutor {
data.parse::<u32>().context(Error::ParsePid)
}
pub(crate) fn get_bundle_path(&self) -> Result<PathBuf> {
std::env::current_dir().context(Error::GetBundlePath)
}
pub(crate) fn socket_address(&self, id: &str) -> Result<PathBuf> {
if id.is_empty() {
return Err(anyhow!(Error::EmptySandboxId));
@ -72,8 +68,6 @@ impl ShimExecutor {
let data = [&self.args.address, &self.args.namespace, id].join("/");
let mut hasher = sha2::Sha256::new();
hasher.update(data);
// Follow
// https://github.com/containerd/containerd/blob/main/runtime/v2/shim/util_unix.go#L68 to
// generate a shim socket path.
Ok(PathBuf::from(format!(
@ -89,6 +83,8 @@ mod tests {
use super::*;
use serial_test::serial;
use kata_sys_util::spec::get_bundle_path;
#[test]
#[serial]
fn test_shim_executor() {
@ -111,7 +107,7 @@ mod tests {
executor
.write_address(bundle_path, Path::new("12345"))
.unwrap();
let dir = executor.get_bundle_path().unwrap();
let dir = get_bundle_path().unwrap();
let file_path = &dir.join("address");
let buf = std::fs::read_to_string(file_path).unwrap();
assert_eq!(&buf, "12345");

View File

@ -7,6 +7,7 @@
use anyhow::{Context, Result};
use containerd_shim_protos::api;
use protobuf::Message;
use std::{fs, path::Path};
use crate::{shim::ShimExecutor, Error};
@ -30,6 +31,16 @@ impl ShimExecutor {
exited_time.set_seconds(seconds);
rsp.set_exited_at(exited_time);
let address = self
.socket_address(&self.args.id)
.context("socket address")?;
let trim_path = address.strip_prefix("unix://").context("trim path")?;
let file_path = Path::new("/").join(trim_path);
let file_path = file_path.as_path();
if std::fs::metadata(&file_path).is_ok() {
info!(sl!(), "remote socket path: {:?}", &file_path);
fs::remove_file(file_path).ok();
}
service::ServiceManager::cleanup(&self.args.id).context("cleanup")?;
Ok(rsp)
}

View File

@ -7,6 +7,7 @@
use std::os::unix::io::RawFd;
use anyhow::{Context, Result};
use kata_sys_util::spec::get_bundle_path;
use crate::{
logger,
@ -18,7 +19,7 @@ impl ShimExecutor {
pub async fn run(&mut self) -> Result<()> {
crate::panic_hook::set_panic_hook();
let sid = self.args.id.clone();
let bundle_path = self.get_bundle_path().context("get bundle")?;
let bundle_path = get_bundle_path().context("get bundle")?;
let path = bundle_path.join("log");
let _logger_guard =
logger::set_logger(path.to_str().unwrap(), &sid, self.args.debug).context("set logger");
@ -36,12 +37,18 @@ impl ShimExecutor {
async fn do_run(&mut self) -> Result<()> {
info!(sl!(), "start to run");
self.args.validate(false).context("validata")?;
self.args.validate(false).context("validate")?;
let server_fd = get_server_fd().context("get server fd")?;
let mut service_manager = service::ServiceManager::new(&self.args.id, server_fd)
.await
.context("new runtime server")?;
let mut service_manager = service::ServiceManager::new(
&self.args.id,
&self.args.publish_binary,
&self.args.address,
&self.args.namespace,
server_fd,
)
.await
.context("new shim server")?;
service_manager.run().await.context("run")?;
Ok(())

View File

@ -12,6 +12,7 @@ use std::{
};
use anyhow::{anyhow, Context, Result};
use kata_sys_util::spec::get_bundle_path;
use kata_types::{container::ContainerType, k8s};
use unix_socket::UnixListener;
@ -32,7 +33,7 @@ impl ShimExecutor {
}
fn do_start(&mut self) -> Result<PathBuf> {
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let bundle_path = get_bundle_path().context("get bundle path")?;
let spec = self.load_oci_spec(&bundle_path)?;
let (container_type, id) = k8s::container_type_with_id(&spec);
@ -66,7 +67,7 @@ impl ShimExecutor {
return Err(anyhow!("invalid param"));
}
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let bundle_path = get_bundle_path().context("get bundle path")?;
let self_exec = std::env::current_exe().map_err(Error::SelfExec)?;
let mut command = std::process::Command::new(self_exec);
@ -109,7 +110,7 @@ impl ShimExecutor {
fn get_shim_info_from_sandbox(&self, sandbox_id: &str) -> Result<(PathBuf, u32)> {
// All containers of a pod share the same pod socket address.
let address = self.socket_address(sandbox_id).context("socket address")?;
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let bundle_path = get_bundle_path().context("get bundle path")?;
let parent_bundle_path = Path::new(&bundle_path)
.parent()
.unwrap_or_else(|| Path::new(""));
@ -165,19 +166,13 @@ mod tests {
let cmd = executor.new_command().unwrap();
assert_eq!(cmd.get_args().len(), 8);
assert_eq!(cmd.get_envs().len(), 1);
assert_eq!(
cmd.get_current_dir().unwrap(),
executor.get_bundle_path().unwrap()
);
assert_eq!(cmd.get_current_dir().unwrap(), get_bundle_path().unwrap());
executor.args.debug = true;
let cmd = executor.new_command().unwrap();
assert_eq!(cmd.get_args().len(), 9);
assert_eq!(cmd.get_envs().len(), 1);
assert_eq!(
cmd.get_current_dir().unwrap(),
executor.get_bundle_path().unwrap()
);
assert_eq!(cmd.get_current_dir().unwrap(), get_bundle_path().unwrap());
}
#[test]