From 1a5f90dc3ff365f4b725d3c76cc42832d7db8a2a Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Fri, 28 Jul 2023 10:48:11 +0800 Subject: [PATCH 1/5] runtime-rs: simplify implementation of service crate Simplify implementation of service crate. Fixes: #7479 Signed-off-by: Jiang Liu --- src/runtime-rs/crates/runtimes/src/manager.rs | 2 +- src/runtime-rs/crates/service/src/manager.rs | 46 +++++++------------ src/runtime-rs/crates/shim-ctl/src/main.rs | 2 +- 3 files changed, 18 insertions(+), 32 deletions(-) diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index a34cddcb3c..1244b6835a 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -221,7 +221,7 @@ impl std::fmt::Debug for RuntimeHandlerManager { } impl RuntimeHandlerManager { - pub async fn new(id: &str, msg_sender: Sender) -> Result { + pub fn new(id: &str, msg_sender: Sender) -> Result { Ok(Self { inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new( id, msg_sender, diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index 83d4584f8d..b7dc565ca2 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -94,6 +94,7 @@ async fn send_event( } impl ServiceManager { + // TODO: who manages lifecycle for `task_server_fd`? pub async fn new( id: &str, containerd_binary: &str, @@ -102,11 +103,9 @@ impl ServiceManager { task_server_fd: RawFd, ) -> Result { let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); - let handler = Arc::new( - RuntimeHandlerManager::new(id, sender) - .await - .context("new runtime handler")?, - ); + let rt_mgr = RuntimeHandlerManager::new(id, sender) + .context("new runtime handler")?; + let handler = Arc::new(rt_mgr); let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; task_server = task_server.set_domain_unix(); Ok(Self { @@ -121,7 +120,7 @@ impl ServiceManager { pub async fn run(&mut self) -> Result<()> { info!(sl!(), "begin to run service"); - self.start().await.context("start")?; + self.start().await.context("start service")?; info!(sl!(), "wait server message"); let mut rx = self.receiver.take(); @@ -144,8 +143,7 @@ impl ServiceManager { event, ) .await - .context("send event")?; - Ok(()) + .context("send event") } }; @@ -166,7 +164,6 @@ impl ServiceManager { pub async fn cleanup(sid: &str) -> Result<()> { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); let handler = RuntimeHandlerManager::new(sid, sender) - .await .context("new runtime handler")?; handler.cleanup().await.context("runtime handler cleanup")?; let temp_dir = [KATA_PATH, sid].join("/"); @@ -183,31 +180,20 @@ impl ServiceManager { } async fn start(&mut self) -> Result<()> { - let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) - as Box); - let task_server = self.task_server.take(); - let task_server = match task_server { - Some(t) => { - let mut t = t.register_service(shim_async::create_task(task_service)); - t.start().await.context("task server start")?; - Some(t) - } - None => None, - }; - self.task_server = task_server; + if let Some(t) = self.task_server.take() { + let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) + as Box); + let mut t = t.register_service(shim_async::create_task(task_service)); + t.start().await.context("task server start")?; + self.task_server = Some(t); + } Ok(()) } async fn stop_listen(&mut self) -> Result<()> { - let task_server = self.task_server.take(); - let task_server = match task_server { - Some(mut t) => { - t.stop_listen().await; - Some(t) - } - None => None, - }; - self.task_server = task_server; + if let Some(t) = self.task_server.as_mut() { + t.stop_listen().await; + } Ok(()) } } diff --git a/src/runtime-rs/crates/shim-ctl/src/main.rs b/src/runtime-rs/crates/shim-ctl/src/main.rs index fd51521738..76506fec2d 100644 --- a/src/runtime-rs/crates/shim-ctl/src/main.rs +++ b/src/runtime-rs/crates/shim-ctl/src/main.rs @@ -16,7 +16,7 @@ const WORKER_THREADS: usize = 2; async fn real_main() { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); - let manager = RuntimeHandlerManager::new("xxx", sender).await.unwrap(); + let manager = RuntimeHandlerManager::new("xxx", sender).unwrap(); let req = Request::CreateContainer(ContainerConfig { container_id: "xxx".to_owned(), From 1cc1c81c9a92e558dc125880f4e3bad07624245c Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Fri, 28 Jul 2023 14:14:12 +0800 Subject: [PATCH 2/5] runtime-rs: fix possibe bug in ServiceManager::run() Multiple instances of task service may get registered by ServiceManager::run(), fix it by making operation symmetric. Fixes: #7479 Signed-off-by: Jiang Liu --- src/runtime-rs/crates/service/src/manager.rs | 34 ++++++++++++-------- src/runtime-rs/crates/shim/src/shim_run.rs | 2 +- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index b7dc565ca2..9d6bbf6d58 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -18,6 +18,7 @@ use containerd_shim_protos::{ shim_async, }; use runtimes::RuntimeHandlerManager; +use shim_interface::KATA_PATH; use tokio::{ io::AsyncWriteExt, process::Command, @@ -26,9 +27,9 @@ use tokio::{ use ttrpc::asynchronous::Server; use crate::task_service::TaskService; + /// message buffer size const MESSAGE_BUFFER_SIZE: usize = 8; -use shim_interface::KATA_PATH; pub struct ServiceManager { receiver: Option>, @@ -103,8 +104,7 @@ impl ServiceManager { task_server_fd: RawFd, ) -> Result { let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); - let rt_mgr = RuntimeHandlerManager::new(id, sender) - .context("new runtime handler")?; + let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?; let handler = Arc::new(rt_mgr); let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; task_server = task_server.set_domain_unix(); @@ -118,9 +118,10 @@ impl ServiceManager { }) } - pub async fn run(&mut self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { info!(sl!(), "begin to run service"); - self.start().await.context("start service")?; + self.registry_service().context("registry service")?; + self.start_service().await.context("start service")?; info!(sl!(), "wait server message"); let mut rx = self.receiver.take(); @@ -128,10 +129,10 @@ impl ServiceManager { while let Some(r) = rx.recv().await { info!(sl!(), "receive action {:?}", &r.action); let result = match r.action { - Action::Start => self.start().await.context("start listen"), - Action::Stop => self.stop_listen().await.context("stop listen"), + Action::Start => self.start_service().await.context("start listen"), + Action::Stop => self.stop_service().await.context("stop listen"), Action::Shutdown => { - self.stop_listen().await.context("stop listen")?; + self.stop_service().await.context("stop listen")?; break; } Action::Event(event) => { @@ -163,8 +164,7 @@ impl ServiceManager { pub async fn cleanup(sid: &str) -> Result<()> { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); - let handler = RuntimeHandlerManager::new(sid, sender) - .context("new runtime handler")?; + let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?; handler.cleanup().await.context("runtime handler cleanup")?; let temp_dir = [KATA_PATH, sid].join("/"); if std::fs::metadata(temp_dir.as_str()).is_ok() { @@ -179,18 +179,24 @@ impl ServiceManager { Ok(()) } - async fn start(&mut self) -> Result<()> { + fn registry_service(&mut self) -> Result<()> { if let Some(t) = self.task_server.take() { let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) as Box); - let mut t = t.register_service(shim_async::create_task(task_service)); - t.start().await.context("task server start")?; + let t = t.register_service(shim_async::create_task(task_service)); self.task_server = Some(t); } Ok(()) } - async fn stop_listen(&mut self) -> Result<()> { + async fn start_service(&mut self) -> Result<()> { + if let Some(t) = self.task_server.as_mut() { + t.start().await.context("task server start")?; + } + Ok(()) + } + + async fn stop_service(&mut self) -> Result<()> { if let Some(t) = self.task_server.as_mut() { t.stop_listen().await; } diff --git a/src/runtime-rs/crates/shim/src/shim_run.rs b/src/runtime-rs/crates/shim/src/shim_run.rs index 5445ac635e..64e81ca407 100644 --- a/src/runtime-rs/crates/shim/src/shim_run.rs +++ b/src/runtime-rs/crates/shim/src/shim_run.rs @@ -46,7 +46,7 @@ impl ShimExecutor { self.args.validate(false).context("validate")?; let server_fd = get_server_fd().context("get server fd")?; - let mut service_manager = service::ServiceManager::new( + let service_manager = service::ServiceManager::new( &self.args.id, &self.args.publish_binary, &self.args.address, From 458e1bc7125ddf3ca3b32b406dd1e9df56fae3b9 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Fri, 28 Jul 2023 14:27:30 +0800 Subject: [PATCH 3/5] runtime-rs: make send_message() as an method of ServiceManager Simplify implementation by making send_message() as an method of ServiceManager. Fixes: #7479 Signed-off-by: Jiang Liu --- src/runtime-rs/crates/service/src/manager.rs | 86 +++++++++----------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index 9d6bbf6d58..f224675dc2 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -53,47 +53,6 @@ impl std::fmt::Debug for ServiceManager { } } -async fn send_event( - containerd_binary: String, - address: String, - namespace: String, - event: Arc, -) -> Result<()> { - let any = Any { - type_url: event.type_url(), - value: event.value().context("get event value")?, - ..Default::default() - }; - let data = any.write_to_bytes().context("write to any")?; - let mut child = Command::new(containerd_binary) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .args([ - "--address", - &address, - "publish", - "--topic", - &event.r#type(), - "--namespace", - &namespace, - ]) - .spawn() - .context("spawn containerd cmd to publish event")?; - - let stdin = child.stdin.as_mut().context("failed to open stdin")?; - stdin - .write_all(&data) - .await - .context("failed to write to stdin")?; - let output = child - .wait_with_output() - .await - .context("failed to read stdout")?; - info!(sl!(), "get output: {:?}", output); - Ok(()) -} - impl ServiceManager { // TODO: who manages lifecycle for `task_server_fd`? pub async fn new( @@ -137,14 +96,7 @@ impl ServiceManager { } Action::Event(event) => { info!(sl!(), "get event {:?}", &event); - send_event( - self.binary.clone(), - self.address.clone(), - self.namespace.clone(), - event, - ) - .await - .context("send event") + self.send_event(event).await.context("send event") } }; @@ -202,4 +154,40 @@ impl ServiceManager { } Ok(()) } + + async fn send_event(&self, event: Arc) -> Result<()> { + let any = Any { + type_url: event.type_url(), + value: event.value().context("get event value")?, + ..Default::default() + }; + let data = any.write_to_bytes().context("write to any")?; + let mut child = Command::new(&self.binary) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .args([ + "--address", + &self.address, + "publish", + "--topic", + &event.r#type(), + "--namespace", + &self.namespace, + ]) + .spawn() + .context("spawn containerd cmd to publish event")?; + + let stdin = child.stdin.as_mut().context("failed to open stdin")?; + stdin + .write_all(&data) + .await + .context("failed to write to stdin")?; + let output = child + .wait_with_output() + .await + .context("failed to read stdout")?; + info!(sl!(), "get output: {:?}", output); + Ok(()) + } } From 62e328ca5c110778370c4b279fdf5eb5f7d34a00 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Fri, 28 Jul 2023 15:05:34 +0800 Subject: [PATCH 4/5] runtime-rs: refine implementation of TaskService Refine implementation of TaskService, making handler_message() as a method. Fixes: #7479 Signed-off-by: Jiang Liu --- .../crates/service/src/task_service.rs | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index a612796c71..9db1bcbe4d 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -24,31 +24,31 @@ impl TaskService { pub(crate) fn new(handler: Arc) -> Self { Self { handler } } -} -async fn handler_message( - s: &RuntimeHandlerManager, - ctx: &TtrpcContext, - req: TtrpcReq, -) -> ttrpc::Result -where - Request: TryFrom, - >::Error: std::fmt::Debug, - TtrpcResp: TryFrom, - >::Error: std::fmt::Debug, -{ - let r = req - .try_into() - .map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?; - let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); - debug!(logger, "====> task service {:?}", &r); - let resp = s - .handler_message(r) - .await - .map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?; - debug!(logger, "<==== task service {:?}", &resp); - resp.try_into() - .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) + async fn handler_message( + &self, + ctx: &TtrpcContext, + req: TtrpcReq, + ) -> ttrpc::Result + where + Request: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, + { + let r = req.try_into().map_err(|err| { + ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) + })?; + let logger = sl!().new(o!("stream id" => ctx.mh.stream_id)); + debug!(logger, "====> task service {:?}", &r); + let resp = + self.handler.handler_message(r).await.map_err(|err| { + ttrpc::Error::Others(format!("failed to handler message {:?}", err)) + })?; + debug!(logger, "<==== task service {:?}", &resp); + resp.try_into() + .map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err))) + } } macro_rules! impl_service { @@ -56,7 +56,7 @@ macro_rules! impl_service { #[async_trait] impl shim_async::Task for TaskService { $(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> { - handler_message(&self.handler, ctx, req).await + self.handler_message(ctx, req).await })* } }; From b3901c46d67a50f82c99f2ff0779a2d38471eb56 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 30 Jul 2023 23:45:40 +0800 Subject: [PATCH 5/5] runtime-rs: ignore errors during clean up sandbox resources Ignore errors during clean up sandbox resources as much as we can. Signed-off-by: Jiang Liu --- src/runtime-rs/crates/service/src/manager.rs | 21 ++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index f224675dc2..46ffc18db2 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -117,17 +117,18 @@ impl ServiceManager { pub async fn cleanup(sid: &str) -> Result<()> { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?; - handler.cleanup().await.context("runtime handler cleanup")?; - let temp_dir = [KATA_PATH, sid].join("/"); - if std::fs::metadata(temp_dir.as_str()).is_ok() { - // try to remove dir and skip the result - fs::remove_dir_all(temp_dir) - .map_err(|err| { - warn!(sl!(), "failed to clean up sandbox tmp dir"); - err - }) - .ok(); + if let Err(e) = handler.cleanup().await { + warn!(sl!(), "failed to clean up runtime state, {}", e); } + + let temp_dir = [KATA_PATH, sid].join("/"); + if fs::metadata(temp_dir.as_str()).is_ok() { + // try to remove dir and skip the result + if let Err(e) = fs::remove_dir_all(temp_dir) { + warn!(sl!(), "failed to clean up sandbox tmp dir, {}", e); + } + } + Ok(()) }