Merge pull request #10293 from lsc2001/solve-docker-compatibility

runtime-rs: Notify containerd when process exits
This commit is contained in:
Xuewei Niu 2024-09-26 14:51:20 +08:00 committed by GitHub
commit cb5a2b30e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 125 additions and 27 deletions

View File

@ -6,7 +6,8 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use containerd_shim_protos::{events::task::TaskOOM, protobuf::Message as ProtobufMessage};
use containerd_shim_protos::events::task::{TaskExit, TaskOOM};
use containerd_shim_protos::protobuf::Message as ProtobufMessage;
use tokio::sync::mpsc::{channel, Receiver, Sender};
/// message receiver buffer size
@ -47,7 +48,10 @@ impl Message {
}
const TASK_OOM_EVENT_TOPIC: &str = "/tasks/oom";
const TASK_EXIT_EVENT_TOPIC: &str = "/tasks/exit";
const TASK_OOM_EVENT_URL: &str = "containerd.events.TaskOOM";
const TASK_EXIT_EVENT_URL: &str = "containerd.events.TaskExit";
pub trait Event: std::fmt::Debug + Send {
fn r#type(&self) -> String;
@ -68,3 +72,17 @@ impl Event for TaskOOM {
self.write_to_bytes().context("get oom value")
}
}
impl Event for TaskExit {
fn r#type(&self) -> String {
TASK_EXIT_EVENT_TOPIC.to_string()
}
fn type_url(&self) -> String {
TASK_EXIT_EVENT_URL.to_string()
}
fn value(&self) -> Result<Vec<u8>> {
self.write_to_bytes().context("get exit value")
}
}

View File

@ -4,10 +4,12 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::{types::ContainerProcess, ContainerManager};
use anyhow::Result;
use async_trait::async_trait;
use oci_spec::runtime as oci;
use runtime_spec as spec;
use std::sync::Arc;
#[derive(Clone)]
pub struct SandboxNetworkEnv {
@ -43,6 +45,12 @@ pub trait Sandbox: Send + Sync {
async fn direct_volume_stats(&self, volume_path: &str) -> Result<String>;
async fn direct_volume_resize(&self, resize_req: agent::ResizeVolumeRequest) -> Result<()>;
async fn agent_sock(&self) -> Result<String>;
async fn wait_process(
&self,
cm: Arc<dyn ContainerManager>,
process_id: ContainerProcess,
shim_pid: u32,
) -> Result<()>;
// metrics function
async fn agent_metrics(&self) -> Result<String>;

View File

@ -8,6 +8,7 @@ mod trans_from_agent;
mod trans_from_shim;
mod trans_into_agent;
mod trans_into_shim;
pub mod utils;
use std::fmt;

View File

@ -6,37 +6,16 @@
use std::{
any::type_name,
convert::{Into, TryFrom, TryInto},
time,
convert::{Into, TryFrom},
};
use anyhow::{anyhow, Result};
use containerd_shim_protos::api;
use super::utils::option_system_time_into;
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse};
use crate::error::Error;
fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp {
let mut proto_time = ::protobuf::well_known_types::timestamp::Timestamp::new();
proto_time.seconds = time
.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.try_into()
.unwrap_or_default();
proto_time
}
fn option_system_time_into(
time: Option<time::SystemTime>,
) -> protobuf::MessageField<protobuf::well_known_types::timestamp::Timestamp> {
match time {
Some(v) => ::protobuf::MessageField::some(system_time_into(v)),
None => ::protobuf::MessageField::none(),
}
}
impl From<ProcessExitStatus> for api::WaitResponse {
fn from(from: ProcessExitStatus) -> Self {
Self {

View File

@ -0,0 +1,28 @@
// Copyright 2024 Kata Contributors
//
// SPDX-License-Identifier: Apache-2.0
//
use std::convert::TryInto;
use std::time;
fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp {
let mut proto_time = ::protobuf::well_known_types::timestamp::Timestamp::new();
proto_time.seconds = time
.duration_since(time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.try_into()
.unwrap_or_default();
proto_time
}
pub fn option_system_time_into(
time: Option<time::SystemTime>,
) -> protobuf::MessageField<protobuf::well_known_types::timestamp::Timestamp> {
match time {
Some(v) => ::protobuf::MessageField::some(system_time_into(v)),
None => ::protobuf::MessageField::none(),
}
}

View File

@ -7,7 +7,7 @@
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
types::{TaskRequest, TaskResponse},
types::{ContainerProcess, TaskRequest, TaskResponse},
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
};
use hypervisor::Param;
@ -387,12 +387,27 @@ impl RuntimeHandlerManager {
.await
.context("get runtime instance")?;
let container_id = container_config.container_id.clone();
let shim_pid = instance
.container_manager
.create_container(container_config, spec)
.await
.context("create container")?;
let container_manager = instance.container_manager.clone();
let process_id =
ContainerProcess::new(&container_id, "").context("create container process")?;
let pid = shim_pid.pid;
tokio::spawn(async move {
let result = instance
.sandbox
.wait_process(container_manager, process_id, pid)
.await;
if let Err(e) = result {
error!(sl!(), "sandbox wait process error: {:?}", e);
}
});
Ok(TaskResponse::CreateContainer(shim_pid))
} else {
self.handler_request(req)
@ -451,6 +466,14 @@ impl RuntimeHandlerManager {
.start_process(&process_id)
.await
.context("start process")?;
let pid = shim_pid.pid;
tokio::spawn(async move {
let result = sandbox.wait_process(cm, process_id, pid).await;
if let Err(e) = result {
error!(sl!(), "sandbox wait process error: {:?}", e);
}
});
Ok(TaskResponse::StartProcess(shim_pid))
}

View File

@ -12,8 +12,10 @@ use agent::{
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use common::message::{Action, Message};
use common::{Sandbox, SandboxNetworkEnv};
use containerd_shim_protos::events::task::TaskOOM;
use common::types::utils::option_system_time_into;
use common::types::ContainerProcess;
use common::{ContainerManager, Sandbox, SandboxNetworkEnv};
use containerd_shim_protos::events::task::{TaskExit, TaskOOM};
use hypervisor::VsockConfig;
#[cfg(not(target_arch = "s390x"))]
use hypervisor::{dragonball::Dragonball, HYPERVISOR_DRAGONBALL, HYPERVISOR_FIRECRACKER};
@ -27,6 +29,7 @@ use kata_types::config::hypervisor::HYPERVISOR_NAME_CH;
use kata_types::config::TomlConfig;
use oci_spec::runtime as oci;
use persist::{self, sandbox_persist::Persist};
use protobuf::SpecialFields;
use resource::manager::ManagerArgs;
use resource::network::{dan_config_path, DanNetworkConfig, NetworkConfig, NetworkWithNetNsConfig};
use resource::{ResourceConfig, ResourceManager};
@ -519,6 +522,44 @@ impl Sandbox for VirtSandbox {
Ok(())
}
async fn wait_process(
&self,
cm: Arc<dyn ContainerManager>,
process_id: ContainerProcess,
shim_pid: u32,
) -> Result<()> {
let exit_status = cm.wait_process(&process_id).await?;
info!(sl!(), "container process exited with {:?}", exit_status);
if cm.is_sandbox_container(&process_id).await {
self.stop().await.context("stop sandbox")?;
}
let cid = process_id.container_id();
if cid.is_empty() {
return Err(anyhow!("container id is empty"));
}
let eid = process_id.exec_id();
let id = if eid.is_empty() {
cid.to_string()
} else {
eid.to_string()
};
let event = TaskExit {
container_id: cid.to_string(),
id,
pid: shim_pid,
exit_status: exit_status.exit_code as u32,
exited_at: option_system_time_into(exit_status.exit_time),
special_fields: SpecialFields::new(),
};
let msg = Message::new(Action::Event(Arc::new(event)));
let lock_sender = self.msg_sender.lock().await;
lock_sender.send(msg).await.context("send exit event")?;
Ok(())
}
async fn agent_sock(&self) -> Result<String> {
self.agent.agent_sock().await
}