diff --git a/src/runtime-rs/crates/service/src/lib.rs b/src/runtime-rs/crates/service/src/lib.rs index 7049760ec8..c72fc78e2d 100644 --- a/src/runtime-rs/crates/service/src/lib.rs +++ b/src/runtime-rs/crates/service/src/lib.rs @@ -11,6 +11,7 @@ logging::logger_with_subsystem!(sl, "service"); mod event; mod manager; +mod sandbox_service; mod task_service; pub use manager::ServiceManager; diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index 8542a0c9e9..ea0b0fbab2 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -17,7 +17,9 @@ use tokio::sync::mpsc::{channel, Receiver}; use ttrpc::asynchronous::Server; use crate::event::{new_event_publisher, Forwarder}; +use crate::sandbox_service::SandboxService; use crate::task_service::TaskService; +use containerd_shim_protos::sandbox_async; /// message buffer size const MESSAGE_BUFFER_SIZE: usize = 8; @@ -136,25 +138,29 @@ impl ServiceManager { } fn registry_service(&mut self) -> Result<()> { - if let Some(t) = self.server.take() { + if let Some(s) = self.server.take() { + let sandbox_service = Arc::new(Box::new(SandboxService::new(self.handler.clone())) + as Box); + let s = s.register_service(sandbox_async::create_sandbox(sandbox_service)); + let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) as Box); - let t = t.register_service(shim_async::create_task(task_service)); - self.server = Some(t); + let s = s.register_service(shim_async::create_task(task_service)); + self.server = Some(s); } Ok(()) } async fn start_service(&mut self) -> Result<()> { - if let Some(t) = self.server.as_mut() { - t.start().await.context("task server start")?; + if let Some(s) = self.server.as_mut() { + s.start().await.context("task server start")?; } Ok(()) } async fn stop_service(&mut self) -> Result<()> { - if let Some(t) = self.server.as_mut() { - t.stop_listen().await; + if let Some(s) = self.server.as_mut() { + s.stop_listen().await; } Ok(()) } diff --git a/src/runtime-rs/crates/service/src/sandbox_service.rs b/src/runtime-rs/crates/service/src/sandbox_service.rs new file mode 100644 index 0000000000..90bcf9b584 --- /dev/null +++ b/src/runtime-rs/crates/service/src/sandbox_service.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2019-2025 Alibaba Cloud +// Copyright (c) 2019-2025 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + convert::{TryFrom, TryInto}, + sync::Arc, +}; + +use async_trait::async_trait; +use common::types::{SandboxRequest, SandboxResponse}; +use containerd_shim_protos::{sandbox_api, sandbox_async}; +use runtimes::RuntimeHandlerManager; +use ttrpc::{self, r#async::TtrpcContext}; + +pub(crate) struct SandboxService { + handler: Arc, +} + +impl SandboxService { + pub(crate) fn new(handler: Arc) -> Self { + Self { handler } + } + + async fn handler_message( + &self, + ctx: &TtrpcContext, + req: TtrpcReq, + ) -> ttrpc::Result + where + SandboxRequest: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, + { + let r = req.try_into().map_err(|err| { + ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) + })?; + let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); + debug!(logger, "====> sandbox service {:?}", &r); + let resp = self + .handler + .handler_sandbox_message(r) + .await + .map_err(|err| { + ttrpc::Error::Others(format!("failed to handle sandbox message {:?}", err)) + })?; + debug!(logger, "<==== sandbox service {:?}", &resp); + resp.try_into() + .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) + } +} + +macro_rules! impl_service { + ($($name: tt | $req: ty | $resp: ty),*) => { + #[async_trait] + impl sandbox_async::Sandbox for SandboxService { + $(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> { + self.handler_message(ctx, req).await + })* + } + }; +} + +impl_service!( + create_sandbox | sandbox_api::CreateSandboxRequest | sandbox_api::CreateSandboxResponse, + start_sandbox | sandbox_api::StartSandboxRequest | sandbox_api::StartSandboxResponse, + platform | sandbox_api::PlatformRequest | sandbox_api::PlatformResponse, + stop_sandbox | sandbox_api::StopSandboxRequest | sandbox_api::StopSandboxResponse, + wait_sandbox | sandbox_api::WaitSandboxRequest | sandbox_api::WaitSandboxResponse, + sandbox_status | sandbox_api::SandboxStatusRequest | sandbox_api::SandboxStatusResponse, + ping_sandbox | sandbox_api::PingRequest | sandbox_api::PingResponse, + shutdown_sandbox | sandbox_api::ShutdownSandboxRequest | sandbox_api::ShutdownSandboxResponse +);