diff --git a/src/runtime-rs/crates/runtimes/common/src/message.rs b/src/runtime-rs/crates/runtimes/common/src/message.rs index fa8dbb6fc..832134196 100644 --- a/src/runtime-rs/crates/runtimes/common/src/message.rs +++ b/src/runtime-rs/crates/runtimes/common/src/message.rs @@ -6,7 +6,8 @@ use std::sync::Arc; use anyhow::{Context, Result}; -use containerd_shim_protos::{events::task::TaskOOM, protobuf::Message as ProtobufMessage}; +use containerd_shim_protos::events::task::{TaskExit, TaskOOM}; +use containerd_shim_protos::protobuf::Message as ProtobufMessage; use tokio::sync::mpsc::{channel, Receiver, Sender}; /// message receiver buffer size @@ -47,7 +48,10 @@ impl Message { } const TASK_OOM_EVENT_TOPIC: &str = "/tasks/oom"; +const TASK_EXIT_EVENT_TOPIC: &str = "/tasks/exit"; + const TASK_OOM_EVENT_URL: &str = "containerd.events.TaskOOM"; +const TASK_EXIT_EVENT_URL: &str = "containerd.events.TaskExit"; pub trait Event: std::fmt::Debug + Send { fn r#type(&self) -> String; @@ -68,3 +72,17 @@ impl Event for TaskOOM { self.write_to_bytes().context("get oom value") } } + +impl Event for TaskExit { + fn r#type(&self) -> String { + TASK_EXIT_EVENT_TOPIC.to_string() + } + + fn type_url(&self) -> String { + TASK_EXIT_EVENT_URL.to_string() + } + + fn value(&self) -> Result> { + self.write_to_bytes().context("get exit value") + } +} diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs index 379e21242..442461dd3 100644 --- a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -4,10 +4,12 @@ // SPDX-License-Identifier: Apache-2.0 // +use crate::{types::ContainerProcess, ContainerManager}; use anyhow::Result; use async_trait::async_trait; use oci_spec::runtime as oci; use runtime_spec as spec; +use std::sync::Arc; #[derive(Clone)] pub struct SandboxNetworkEnv { @@ -43,6 +45,12 @@ pub trait Sandbox: Send + Sync { async fn direct_volume_stats(&self, volume_path: &str) -> Result; async fn direct_volume_resize(&self, resize_req: agent::ResizeVolumeRequest) -> Result<()>; async fn agent_sock(&self) -> Result; + async fn wait_process( + &self, + cm: Arc, + process_id: ContainerProcess, + shim_pid: u32, + ) -> Result<()>; // metrics function async fn agent_metrics(&self) -> Result; diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index 4d339d660..735603fbd 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -8,6 +8,7 @@ mod trans_from_agent; mod trans_from_shim; mod trans_into_agent; mod trans_into_shim; +pub mod utils; use std::fmt; diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs index d73866519..e146ace7b 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -6,37 +6,16 @@ use std::{ any::type_name, - convert::{Into, TryFrom, TryInto}, - time, + convert::{Into, TryFrom}, }; use anyhow::{anyhow, Result}; use containerd_shim_protos::api; +use super::utils::option_system_time_into; use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse}; use crate::error::Error; -fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp { - let mut proto_time = ::protobuf::well_known_types::timestamp::Timestamp::new(); - proto_time.seconds = time - .duration_since(time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() - .try_into() - .unwrap_or_default(); - - proto_time -} - -fn option_system_time_into( - time: Option, -) -> protobuf::MessageField { - match time { - Some(v) => ::protobuf::MessageField::some(system_time_into(v)), - None => ::protobuf::MessageField::none(), - } -} - impl From for api::WaitResponse { fn from(from: ProcessExitStatus) -> Self { Self { diff --git a/src/runtime-rs/crates/runtimes/common/src/types/utils.rs b/src/runtime-rs/crates/runtimes/common/src/types/utils.rs new file mode 100644 index 000000000..cbf39e9df --- /dev/null +++ b/src/runtime-rs/crates/runtimes/common/src/types/utils.rs @@ -0,0 +1,28 @@ +// Copyright 2024 Kata Contributors +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::convert::TryInto; +use std::time; + +fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp { + let mut proto_time = ::protobuf::well_known_types::timestamp::Timestamp::new(); + proto_time.seconds = time + .duration_since(time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .try_into() + .unwrap_or_default(); + + proto_time +} + +pub fn option_system_time_into( + time: Option, +) -> protobuf::MessageField { + match time { + Some(v) => ::protobuf::MessageField::some(system_time_into(v)), + None => ::protobuf::MessageField::none(), + } +} diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index ba61a7c3f..90472c535 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Context, Result}; use common::{ message::Message, - types::{TaskRequest, TaskResponse}, + types::{ContainerProcess, TaskRequest, TaskResponse}, RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv, }; use hypervisor::Param; @@ -387,12 +387,27 @@ impl RuntimeHandlerManager { .await .context("get runtime instance")?; + let container_id = container_config.container_id.clone(); let shim_pid = instance .container_manager .create_container(container_config, spec) .await .context("create container")?; + let container_manager = instance.container_manager.clone(); + let process_id = + ContainerProcess::new(&container_id, "").context("create container process")?; + let pid = shim_pid.pid; + tokio::spawn(async move { + let result = instance + .sandbox + .wait_process(container_manager, process_id, pid) + .await; + if let Err(e) = result { + error!(sl!(), "sandbox wait process error: {:?}", e); + } + }); + Ok(TaskResponse::CreateContainer(shim_pid)) } else { self.handler_request(req) @@ -451,6 +466,14 @@ impl RuntimeHandlerManager { .start_process(&process_id) .await .context("start process")?; + + let pid = shim_pid.pid; + tokio::spawn(async move { + let result = sandbox.wait_process(cm, process_id, pid).await; + if let Err(e) = result { + error!(sl!(), "sandbox wait process error: {:?}", e); + } + }); Ok(TaskResponse::StartProcess(shim_pid)) } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs index 686aa7ee1..94b742645 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs @@ -12,8 +12,10 @@ use agent::{ use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use common::message::{Action, Message}; -use common::{Sandbox, SandboxNetworkEnv}; -use containerd_shim_protos::events::task::TaskOOM; +use common::types::utils::option_system_time_into; +use common::types::ContainerProcess; +use common::{ContainerManager, Sandbox, SandboxNetworkEnv}; +use containerd_shim_protos::events::task::{TaskExit, TaskOOM}; use hypervisor::VsockConfig; #[cfg(not(target_arch = "s390x"))] use hypervisor::{dragonball::Dragonball, HYPERVISOR_DRAGONBALL, HYPERVISOR_FIRECRACKER}; @@ -27,6 +29,7 @@ use kata_types::config::hypervisor::HYPERVISOR_NAME_CH; use kata_types::config::TomlConfig; use oci_spec::runtime as oci; use persist::{self, sandbox_persist::Persist}; +use protobuf::SpecialFields; use resource::manager::ManagerArgs; use resource::network::{dan_config_path, DanNetworkConfig, NetworkConfig, NetworkWithNetNsConfig}; use resource::{ResourceConfig, ResourceManager}; @@ -519,6 +522,44 @@ impl Sandbox for VirtSandbox { Ok(()) } + async fn wait_process( + &self, + cm: Arc, + process_id: ContainerProcess, + shim_pid: u32, + ) -> Result<()> { + let exit_status = cm.wait_process(&process_id).await?; + info!(sl!(), "container process exited with {:?}", exit_status); + + if cm.is_sandbox_container(&process_id).await { + self.stop().await.context("stop sandbox")?; + } + + let cid = process_id.container_id(); + if cid.is_empty() { + return Err(anyhow!("container id is empty")); + } + let eid = process_id.exec_id(); + let id = if eid.is_empty() { + cid.to_string() + } else { + eid.to_string() + }; + + let event = TaskExit { + container_id: cid.to_string(), + id, + pid: shim_pid, + exit_status: exit_status.exit_code as u32, + exited_at: option_system_time_into(exit_status.exit_time), + special_fields: SpecialFields::new(), + }; + let msg = Message::new(Action::Event(Arc::new(event))); + let lock_sender = self.msg_sender.lock().await; + lock_sender.send(msg).await.context("send exit event")?; + Ok(()) + } + async fn agent_sock(&self) -> Result { self.agent.agent_sock().await }