diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index a34cddcb3c..1244b6835a 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -221,7 +221,7 @@ impl std::fmt::Debug for RuntimeHandlerManager { } impl RuntimeHandlerManager { - pub async fn new(id: &str, msg_sender: Sender) -> Result { + pub fn new(id: &str, msg_sender: Sender) -> Result { Ok(Self { inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new( id, msg_sender, diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index 83d4584f8d..46ffc18db2 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -18,6 +18,7 @@ use containerd_shim_protos::{ shim_async, }; use runtimes::RuntimeHandlerManager; +use shim_interface::KATA_PATH; use tokio::{ io::AsyncWriteExt, process::Command, @@ -26,9 +27,9 @@ use tokio::{ use ttrpc::asynchronous::Server; use crate::task_service::TaskService; + /// message buffer size const MESSAGE_BUFFER_SIZE: usize = 8; -use shim_interface::KATA_PATH; pub struct ServiceManager { receiver: Option>, @@ -52,48 +53,8 @@ impl std::fmt::Debug for ServiceManager { } } -async fn send_event( - containerd_binary: String, - address: String, - namespace: String, - event: Arc, -) -> Result<()> { - let any = Any { - type_url: event.type_url(), - value: event.value().context("get event value")?, - ..Default::default() - }; - let data = any.write_to_bytes().context("write to any")?; - let mut child = Command::new(containerd_binary) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .args([ - "--address", - &address, - "publish", - "--topic", - &event.r#type(), - "--namespace", - &namespace, - ]) - .spawn() - .context("spawn containerd cmd to publish event")?; - - let stdin = child.stdin.as_mut().context("failed to open stdin")?; - stdin - .write_all(&data) - .await - .context("failed to write to stdin")?; - let output = child - .wait_with_output() - .await - .context("failed to read stdout")?; - info!(sl!(), "get output: {:?}", output); - Ok(()) -} - impl ServiceManager { + // TODO: who manages lifecycle for `task_server_fd`? pub async fn new( id: &str, containerd_binary: &str, @@ -102,11 +63,8 @@ impl ServiceManager { task_server_fd: RawFd, ) -> Result { let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); - let handler = Arc::new( - RuntimeHandlerManager::new(id, sender) - .await - .context("new runtime handler")?, - ); + let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?; + let handler = Arc::new(rt_mgr); let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; task_server = task_server.set_domain_unix(); Ok(Self { @@ -119,9 +77,10 @@ impl ServiceManager { }) } - pub async fn run(&mut self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { info!(sl!(), "begin to run service"); - self.start().await.context("start")?; + self.registry_service().context("registry service")?; + self.start_service().await.context("start service")?; info!(sl!(), "wait server message"); let mut rx = self.receiver.take(); @@ -129,23 +88,15 @@ impl ServiceManager { while let Some(r) = rx.recv().await { info!(sl!(), "receive action {:?}", &r.action); let result = match r.action { - Action::Start => self.start().await.context("start listen"), - Action::Stop => self.stop_listen().await.context("stop listen"), + Action::Start => self.start_service().await.context("start listen"), + Action::Stop => self.stop_service().await.context("stop listen"), Action::Shutdown => { - self.stop_listen().await.context("stop listen")?; + self.stop_service().await.context("stop listen")?; break; } Action::Event(event) => { info!(sl!(), "get event {:?}", &event); - send_event( - self.binary.clone(), - self.address.clone(), - self.namespace.clone(), - event, - ) - .await - .context("send event")?; - Ok(()) + self.send_event(event).await.context("send event") } }; @@ -165,49 +116,79 @@ impl ServiceManager { pub async fn cleanup(sid: &str) -> Result<()> { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); - let handler = RuntimeHandlerManager::new(sid, sender) - .await - .context("new runtime handler")?; - handler.cleanup().await.context("runtime handler cleanup")?; + let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?; + if let Err(e) = handler.cleanup().await { + warn!(sl!(), "failed to clean up runtime state, {}", e); + } + let temp_dir = [KATA_PATH, sid].join("/"); - if std::fs::metadata(temp_dir.as_str()).is_ok() { + if fs::metadata(temp_dir.as_str()).is_ok() { // try to remove dir and skip the result - fs::remove_dir_all(temp_dir) - .map_err(|err| { - warn!(sl!(), "failed to clean up sandbox tmp dir"); - err - }) - .ok(); + if let Err(e) = fs::remove_dir_all(temp_dir) { + warn!(sl!(), "failed to clean up sandbox tmp dir, {}", e); + } + } + + Ok(()) + } + + fn registry_service(&mut self) -> Result<()> { + if let Some(t) = self.task_server.take() { + let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) + as Box); + let t = t.register_service(shim_async::create_task(task_service)); + self.task_server = Some(t); } Ok(()) } - async fn start(&mut self) -> Result<()> { - let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) - as Box); - let task_server = self.task_server.take(); - let task_server = match task_server { - Some(t) => { - let mut t = t.register_service(shim_async::create_task(task_service)); - t.start().await.context("task server start")?; - Some(t) - } - None => None, - }; - self.task_server = task_server; + async fn start_service(&mut self) -> Result<()> { + if let Some(t) = self.task_server.as_mut() { + t.start().await.context("task server start")?; + } Ok(()) } - async fn stop_listen(&mut self) -> Result<()> { - let task_server = self.task_server.take(); - let task_server = match task_server { - Some(mut t) => { - t.stop_listen().await; - Some(t) - } - None => None, + async fn stop_service(&mut self) -> Result<()> { + if let Some(t) = self.task_server.as_mut() { + t.stop_listen().await; + } + Ok(()) + } + + async fn send_event(&self, event: Arc) -> Result<()> { + let any = Any { + type_url: event.type_url(), + value: event.value().context("get event value")?, + ..Default::default() }; - self.task_server = task_server; + let data = any.write_to_bytes().context("write to any")?; + let mut child = Command::new(&self.binary) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .args([ + "--address", + &self.address, + "publish", + "--topic", + &event.r#type(), + "--namespace", + &self.namespace, + ]) + .spawn() + .context("spawn containerd cmd to publish event")?; + + let stdin = child.stdin.as_mut().context("failed to open stdin")?; + stdin + .write_all(&data) + .await + .context("failed to write to stdin")?; + let output = child + .wait_with_output() + .await + .context("failed to read stdout")?; + info!(sl!(), "get output: {:?}", output); Ok(()) } } diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index a612796c71..9db1bcbe4d 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -24,31 +24,31 @@ impl TaskService { pub(crate) fn new(handler: Arc) -> Self { Self { handler } } -} -async fn handler_message( - s: &RuntimeHandlerManager, - ctx: &TtrpcContext, - req: TtrpcReq, -) -> ttrpc::Result -where - Request: TryFrom, - >::Error: std::fmt::Debug, - TtrpcResp: TryFrom, - >::Error: std::fmt::Debug, -{ - let r = req - .try_into() - .map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?; - let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); - debug!(logger, "====> task service {:?}", &r); - let resp = s - .handler_message(r) - .await - .map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?; - debug!(logger, "<==== task service {:?}", &resp); - resp.try_into() - .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) + async fn handler_message( + &self, + ctx: &TtrpcContext, + req: TtrpcReq, + ) -> ttrpc::Result + where + Request: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, + { + let r = req.try_into().map_err(|err| { + ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) + })?; + let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); + debug!(logger, "====> task service {:?}", &r); + let resp = + self.handler.handler_message(r).await.map_err(|err| { + ttrpc::Error::Others(format!("failed to handler message {:?}", err)) + })?; + debug!(logger, "<==== task service {:?}", &resp); + resp.try_into() + .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) + } } macro_rules! impl_service { @@ -56,7 +56,7 @@ macro_rules! impl_service { #[async_trait] impl shim_async::Task for TaskService { $(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> { - handler_message(&self.handler, ctx, req).await + self.handler_message(ctx, req).await })* } }; diff --git a/src/runtime-rs/crates/shim-ctl/src/main.rs b/src/runtime-rs/crates/shim-ctl/src/main.rs index fd51521738..76506fec2d 100644 --- a/src/runtime-rs/crates/shim-ctl/src/main.rs +++ b/src/runtime-rs/crates/shim-ctl/src/main.rs @@ -16,7 +16,7 @@ const WORKER_THREADS: usize = 2; async fn real_main() { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); - let manager = RuntimeHandlerManager::new("xxx", sender).await.unwrap(); + let manager = RuntimeHandlerManager::new("xxx", sender).unwrap(); let req = Request::CreateContainer(ContainerConfig { container_id: "xxx".to_owned(), diff --git a/src/runtime-rs/crates/shim/src/shim_run.rs b/src/runtime-rs/crates/shim/src/shim_run.rs index 5445ac635e..64e81ca407 100644 --- a/src/runtime-rs/crates/shim/src/shim_run.rs +++ b/src/runtime-rs/crates/shim/src/shim_run.rs @@ -46,7 +46,7 @@ impl ShimExecutor { self.args.validate(false).context("validate")?; let server_fd = get_server_fd().context("get server fd")?; - let mut service_manager = service::ServiceManager::new( + let service_manager = service::ServiceManager::new( &self.args.id, &self.args.publish_binary, &self.args.address,