Merge pull request #7480 from jiangliu/rt-service

Simplify implementation of runtime-rs/service
This commit is contained in:
Chao Wu
2023-08-01 16:05:33 +08:00
committed by GitHub
5 changed files with 103 additions and 122 deletions

View File

@@ -221,7 +221,7 @@ impl std::fmt::Debug for RuntimeHandlerManager {
} }
impl RuntimeHandlerManager { impl RuntimeHandlerManager {
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> { pub fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
Ok(Self { Ok(Self {
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new( inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
id, msg_sender, id, msg_sender,

View File

@@ -18,6 +18,7 @@ use containerd_shim_protos::{
shim_async, shim_async,
}; };
use runtimes::RuntimeHandlerManager; use runtimes::RuntimeHandlerManager;
use shim_interface::KATA_PATH;
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
process::Command, process::Command,
@@ -26,9 +27,9 @@ use tokio::{
use ttrpc::asynchronous::Server; use ttrpc::asynchronous::Server;
use crate::task_service::TaskService; use crate::task_service::TaskService;
/// message buffer size /// message buffer size
const MESSAGE_BUFFER_SIZE: usize = 8; const MESSAGE_BUFFER_SIZE: usize = 8;
use shim_interface::KATA_PATH;
pub struct ServiceManager { pub struct ServiceManager {
receiver: Option<Receiver<Message>>, receiver: Option<Receiver<Message>>,
@@ -52,48 +53,8 @@ impl std::fmt::Debug for ServiceManager {
} }
} }
async fn send_event(
containerd_binary: String,
address: String,
namespace: String,
event: Arc<dyn Event>,
) -> Result<()> {
let any = Any {
type_url: event.type_url(),
value: event.value().context("get event value")?,
..Default::default()
};
let data = any.write_to_bytes().context("write to any")?;
let mut child = Command::new(containerd_binary)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.args([
"--address",
&address,
"publish",
"--topic",
&event.r#type(),
"--namespace",
&namespace,
])
.spawn()
.context("spawn containerd cmd to publish event")?;
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
stdin
.write_all(&data)
.await
.context("failed to write to stdin")?;
let output = child
.wait_with_output()
.await
.context("failed to read stdout")?;
info!(sl!(), "get output: {:?}", output);
Ok(())
}
impl ServiceManager { impl ServiceManager {
// TODO: who manages lifecycle for `task_server_fd`?
pub async fn new( pub async fn new(
id: &str, id: &str,
containerd_binary: &str, containerd_binary: &str,
@@ -102,11 +63,8 @@ impl ServiceManager {
task_server_fd: RawFd, task_server_fd: RawFd,
) -> Result<Self> { ) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = Arc::new( let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?;
RuntimeHandlerManager::new(id, sender) let handler = Arc::new(rt_mgr);
.await
.context("new runtime handler")?,
);
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
task_server = task_server.set_domain_unix(); task_server = task_server.set_domain_unix();
Ok(Self { Ok(Self {
@@ -119,9 +77,10 @@ impl ServiceManager {
}) })
} }
pub async fn run(&mut self) -> Result<()> { pub async fn run(mut self) -> Result<()> {
info!(sl!(), "begin to run service"); info!(sl!(), "begin to run service");
self.start().await.context("start")?; self.registry_service().context("registry service")?;
self.start_service().await.context("start service")?;
info!(sl!(), "wait server message"); info!(sl!(), "wait server message");
let mut rx = self.receiver.take(); let mut rx = self.receiver.take();
@@ -129,23 +88,15 @@ impl ServiceManager {
while let Some(r) = rx.recv().await { while let Some(r) = rx.recv().await {
info!(sl!(), "receive action {:?}", &r.action); info!(sl!(), "receive action {:?}", &r.action);
let result = match r.action { let result = match r.action {
Action::Start => self.start().await.context("start listen"), Action::Start => self.start_service().await.context("start listen"),
Action::Stop => self.stop_listen().await.context("stop listen"), Action::Stop => self.stop_service().await.context("stop listen"),
Action::Shutdown => { Action::Shutdown => {
self.stop_listen().await.context("stop listen")?; self.stop_service().await.context("stop listen")?;
break; break;
} }
Action::Event(event) => { Action::Event(event) => {
info!(sl!(), "get event {:?}", &event); info!(sl!(), "get event {:?}", &event);
send_event( self.send_event(event).await.context("send event")
self.binary.clone(),
self.address.clone(),
self.namespace.clone(),
event,
)
.await
.context("send event")?;
Ok(())
} }
}; };
@@ -165,49 +116,79 @@ impl ServiceManager {
pub async fn cleanup(sid: &str) -> Result<()> { pub async fn cleanup(sid: &str) -> Result<()> {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = RuntimeHandlerManager::new(sid, sender) let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?;
.await if let Err(e) = handler.cleanup().await {
.context("new runtime handler")?; warn!(sl!(), "failed to clean up runtime state, {}", e);
handler.cleanup().await.context("runtime handler cleanup")?; }
let temp_dir = [KATA_PATH, sid].join("/"); let temp_dir = [KATA_PATH, sid].join("/");
if std::fs::metadata(temp_dir.as_str()).is_ok() { if fs::metadata(temp_dir.as_str()).is_ok() {
// try to remove dir and skip the result // try to remove dir and skip the result
fs::remove_dir_all(temp_dir) if let Err(e) = fs::remove_dir_all(temp_dir) {
.map_err(|err| { warn!(sl!(), "failed to clean up sandbox tmp dir, {}", e);
warn!(sl!(), "failed to clean up sandbox tmp dir"); }
err }
})
.ok(); Ok(())
}
fn registry_service(&mut self) -> Result<()> {
if let Some(t) = self.task_server.take() {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>);
let t = t.register_service(shim_async::create_task(task_service));
self.task_server = Some(t);
} }
Ok(()) Ok(())
} }
async fn start(&mut self) -> Result<()> { async fn start_service(&mut self) -> Result<()> {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) if let Some(t) = self.task_server.as_mut() {
as Box<dyn shim_async::Task + Send + Sync>); t.start().await.context("task server start")?;
let task_server = self.task_server.take(); }
let task_server = match task_server {
Some(t) => {
let mut t = t.register_service(shim_async::create_task(task_service));
t.start().await.context("task server start")?;
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(()) Ok(())
} }
async fn stop_listen(&mut self) -> Result<()> { async fn stop_service(&mut self) -> Result<()> {
let task_server = self.task_server.take(); if let Some(t) = self.task_server.as_mut() {
let task_server = match task_server { t.stop_listen().await;
Some(mut t) => { }
t.stop_listen().await; Ok(())
Some(t) }
}
None => None, async fn send_event(&self, event: Arc<dyn Event>) -> Result<()> {
let any = Any {
type_url: event.type_url(),
value: event.value().context("get event value")?,
..Default::default()
}; };
self.task_server = task_server; let data = any.write_to_bytes().context("write to any")?;
let mut child = Command::new(&self.binary)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.args([
"--address",
&self.address,
"publish",
"--topic",
&event.r#type(),
"--namespace",
&self.namespace,
])
.spawn()
.context("spawn containerd cmd to publish event")?;
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
stdin
.write_all(&data)
.await
.context("failed to write to stdin")?;
let output = child
.wait_with_output()
.await
.context("failed to read stdout")?;
info!(sl!(), "get output: {:?}", output);
Ok(()) Ok(())
} }
} }

View File

@@ -24,31 +24,31 @@ impl TaskService {
pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self { pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self {
Self { handler } Self { handler }
} }
}
async fn handler_message<TtrpcReq, TtrpcResp>( async fn handler_message<TtrpcReq, TtrpcResp>(
s: &RuntimeHandlerManager, &self,
ctx: &TtrpcContext, ctx: &TtrpcContext,
req: TtrpcReq, req: TtrpcReq,
) -> ttrpc::Result<TtrpcResp> ) -> ttrpc::Result<TtrpcResp>
where where
Request: TryFrom<TtrpcReq>, Request: TryFrom<TtrpcReq>,
<Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug, <Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
TtrpcResp: TryFrom<Response>, TtrpcResp: TryFrom<Response>,
<TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug, <TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug,
{ {
let r = req let r = req.try_into().map_err(|err| {
.try_into() ttrpc::Error::Others(format!("failed to translate from shim {:?}", err))
.map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?; })?;
let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); let logger = sl!().new(o!("stream id" => ctx.mh.stream_id));
debug!(logger, "====> task service {:?}", &r); debug!(logger, "====> task service {:?}", &r);
let resp = s let resp =
.handler_message(r) self.handler.handler_message(r).await.map_err(|err| {
.await ttrpc::Error::Others(format!("failed to handler message {:?}", err))
.map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?; })?;
debug!(logger, "<==== task service {:?}", &resp); debug!(logger, "<==== task service {:?}", &resp);
resp.try_into() resp.try_into()
.map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))
}
} }
macro_rules! impl_service { macro_rules! impl_service {
@@ -56,7 +56,7 @@ macro_rules! impl_service {
#[async_trait] #[async_trait]
impl shim_async::Task for TaskService { impl shim_async::Task for TaskService {
$(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> { $(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> {
handler_message(&self.handler, ctx, req).await self.handler_message(ctx, req).await
})* })*
} }
}; };

View File

@@ -16,7 +16,7 @@ const WORKER_THREADS: usize = 2;
async fn real_main() { async fn real_main() {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let manager = RuntimeHandlerManager::new("xxx", sender).await.unwrap(); let manager = RuntimeHandlerManager::new("xxx", sender).unwrap();
let req = Request::CreateContainer(ContainerConfig { let req = Request::CreateContainer(ContainerConfig {
container_id: "xxx".to_owned(), container_id: "xxx".to_owned(),

View File

@@ -46,7 +46,7 @@ impl ShimExecutor {
self.args.validate(false).context("validate")?; self.args.validate(false).context("validate")?;
let server_fd = get_server_fd().context("get server fd")?; let server_fd = get_server_fd().context("get server fd")?;
let mut service_manager = service::ServiceManager::new( let service_manager = service::ServiceManager::new(
&self.args.id, &self.args.id,
&self.args.publish_binary, &self.args.publish_binary,
&self.args.address, &self.args.address,