mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-12 14:48:13 +00:00
runtime-rs: register the sandbox api service
add and resiger the sandbox api service, thus runtime-rs can deal with the sandbox api rpc call from the containerd. Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
parent
8332f427d2
commit
36bf080c1e
@ -11,6 +11,7 @@ logging::logger_with_subsystem!(sl, "service");
|
||||
|
||||
mod event;
|
||||
mod manager;
|
||||
mod sandbox_service;
|
||||
mod task_service;
|
||||
|
||||
pub use manager::ServiceManager;
|
||||
|
@ -17,7 +17,9 @@ use tokio::sync::mpsc::{channel, Receiver};
|
||||
use ttrpc::asynchronous::Server;
|
||||
|
||||
use crate::event::{new_event_publisher, Forwarder};
|
||||
use crate::sandbox_service::SandboxService;
|
||||
use crate::task_service::TaskService;
|
||||
use containerd_shim_protos::sandbox_async;
|
||||
|
||||
/// message buffer size
|
||||
const MESSAGE_BUFFER_SIZE: usize = 8;
|
||||
@ -136,25 +138,29 @@ impl ServiceManager {
|
||||
}
|
||||
|
||||
fn registry_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.server.take() {
|
||||
if let Some(s) = self.server.take() {
|
||||
let sandbox_service = Arc::new(Box::new(SandboxService::new(self.handler.clone()))
|
||||
as Box<dyn sandbox_async::Sandbox + Send + Sync>);
|
||||
let s = s.register_service(sandbox_async::create_sandbox(sandbox_service));
|
||||
|
||||
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
||||
as Box<dyn shim_async::Task + Send + Sync>);
|
||||
let t = t.register_service(shim_async::create_task(task_service));
|
||||
self.server = Some(t);
|
||||
let s = s.register_service(shim_async::create_task(task_service));
|
||||
self.server = Some(s);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.server.as_mut() {
|
||||
t.start().await.context("task server start")?;
|
||||
if let Some(s) = self.server.as_mut() {
|
||||
s.start().await.context("task server start")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.server.as_mut() {
|
||||
t.stop_listen().await;
|
||||
if let Some(s) = self.server.as_mut() {
|
||||
s.stop_listen().await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
76
src/runtime-rs/crates/service/src/sandbox_service.rs
Normal file
76
src/runtime-rs/crates/service/src/sandbox_service.rs
Normal file
@ -0,0 +1,76 @@
|
||||
// Copyright (c) 2019-2025 Alibaba Cloud
|
||||
// Copyright (c) 2019-2025 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common::types::{SandboxRequest, SandboxResponse};
|
||||
use containerd_shim_protos::{sandbox_api, sandbox_async};
|
||||
use runtimes::RuntimeHandlerManager;
|
||||
use ttrpc::{self, r#async::TtrpcContext};
|
||||
|
||||
pub(crate) struct SandboxService {
|
||||
handler: Arc<RuntimeHandlerManager>,
|
||||
}
|
||||
|
||||
impl SandboxService {
|
||||
pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self {
|
||||
Self { handler }
|
||||
}
|
||||
|
||||
async fn handler_message<TtrpcReq, TtrpcResp>(
|
||||
&self,
|
||||
ctx: &TtrpcContext,
|
||||
req: TtrpcReq,
|
||||
) -> ttrpc::Result<TtrpcResp>
|
||||
where
|
||||
SandboxRequest: TryFrom<TtrpcReq>,
|
||||
<SandboxRequest as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
|
||||
TtrpcResp: TryFrom<SandboxResponse>,
|
||||
<TtrpcResp as TryFrom<SandboxResponse>>::Error: std::fmt::Debug,
|
||||
{
|
||||
let r = req.try_into().map_err(|err| {
|
||||
ttrpc::Error::Others(format!("failed to translate from shim {:?}", err))
|
||||
})?;
|
||||
let logger = sl!().new(o!("stream id" => ctx.mh.stream_id));
|
||||
debug!(logger, "====> sandbox service {:?}", &r);
|
||||
let resp = self
|
||||
.handler
|
||||
.handler_sandbox_message(r)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ttrpc::Error::Others(format!("failed to handle sandbox message {:?}", err))
|
||||
})?;
|
||||
debug!(logger, "<==== sandbox service {:?}", &resp);
|
||||
resp.try_into()
|
||||
.map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_service {
|
||||
($($name: tt | $req: ty | $resp: ty),*) => {
|
||||
#[async_trait]
|
||||
impl sandbox_async::Sandbox for SandboxService {
|
||||
$(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> {
|
||||
self.handler_message(ctx, req).await
|
||||
})*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_service!(
|
||||
create_sandbox | sandbox_api::CreateSandboxRequest | sandbox_api::CreateSandboxResponse,
|
||||
start_sandbox | sandbox_api::StartSandboxRequest | sandbox_api::StartSandboxResponse,
|
||||
platform | sandbox_api::PlatformRequest | sandbox_api::PlatformResponse,
|
||||
stop_sandbox | sandbox_api::StopSandboxRequest | sandbox_api::StopSandboxResponse,
|
||||
wait_sandbox | sandbox_api::WaitSandboxRequest | sandbox_api::WaitSandboxResponse,
|
||||
sandbox_status | sandbox_api::SandboxStatusRequest | sandbox_api::SandboxStatusResponse,
|
||||
ping_sandbox | sandbox_api::PingRequest | sandbox_api::PingResponse,
|
||||
shutdown_sandbox | sandbox_api::ShutdownSandboxRequest | sandbox_api::ShutdownSandboxResponse
|
||||
);
|
Loading…
Reference in New Issue
Block a user