diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index b7dc565ca2..9d6bbf6d58 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -18,6 +18,7 @@ use containerd_shim_protos::{ shim_async, }; use runtimes::RuntimeHandlerManager; +use shim_interface::KATA_PATH; use tokio::{ io::AsyncWriteExt, process::Command, @@ -26,9 +27,9 @@ use tokio::{ use ttrpc::asynchronous::Server; use crate::task_service::TaskService; + /// message buffer size const MESSAGE_BUFFER_SIZE: usize = 8; -use shim_interface::KATA_PATH; pub struct ServiceManager { receiver: Option>, @@ -103,8 +104,7 @@ impl ServiceManager { task_server_fd: RawFd, ) -> Result { let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); - let rt_mgr = RuntimeHandlerManager::new(id, sender) - .context("new runtime handler")?; + let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?; let handler = Arc::new(rt_mgr); let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; task_server = task_server.set_domain_unix(); @@ -118,9 +118,10 @@ impl ServiceManager { }) } - pub async fn run(&mut self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { info!(sl!(), "begin to run service"); - self.start().await.context("start service")?; + self.registry_service().context("registry service")?; + self.start_service().await.context("start service")?; info!(sl!(), "wait server message"); let mut rx = self.receiver.take(); @@ -128,10 +129,10 @@ impl ServiceManager { while let Some(r) = rx.recv().await { info!(sl!(), "receive action {:?}", &r.action); let result = match r.action { - Action::Start => self.start().await.context("start listen"), - Action::Stop => self.stop_listen().await.context("stop listen"), + Action::Start => self.start_service().await.context("start listen"), + Action::Stop => self.stop_service().await.context("stop listen"), Action::Shutdown => { - self.stop_listen().await.context("stop listen")?; + self.stop_service().await.context("stop listen")?; break; } Action::Event(event) => { @@ -163,8 +164,7 @@ impl ServiceManager { pub async fn cleanup(sid: &str) -> Result<()> { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); - let handler = RuntimeHandlerManager::new(sid, sender) - .context("new runtime handler")?; + let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?; handler.cleanup().await.context("runtime handler cleanup")?; let temp_dir = [KATA_PATH, sid].join("/"); if std::fs::metadata(temp_dir.as_str()).is_ok() { @@ -179,18 +179,24 @@ impl ServiceManager { Ok(()) } - async fn start(&mut self) -> Result<()> { + fn registry_service(&mut self) -> Result<()> { if let Some(t) = self.task_server.take() { let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) as Box); - let mut t = t.register_service(shim_async::create_task(task_service)); - t.start().await.context("task server start")?; + let t = t.register_service(shim_async::create_task(task_service)); self.task_server = Some(t); } Ok(()) } - async fn stop_listen(&mut self) -> Result<()> { + async fn start_service(&mut self) -> Result<()> { + if let Some(t) = self.task_server.as_mut() { + t.start().await.context("task server start")?; + } + Ok(()) + } + + async fn stop_service(&mut self) -> Result<()> { if let Some(t) = self.task_server.as_mut() { t.stop_listen().await; } diff --git a/src/runtime-rs/crates/shim/src/shim_run.rs b/src/runtime-rs/crates/shim/src/shim_run.rs index 5445ac635e..64e81ca407 100644 --- a/src/runtime-rs/crates/shim/src/shim_run.rs +++ b/src/runtime-rs/crates/shim/src/shim_run.rs @@ -46,7 +46,7 @@ impl ShimExecutor { self.args.validate(false).context("validate")?; let server_fd = get_server_fd().context("get server fd")?; - let mut service_manager = service::ServiceManager::new( + let service_manager = service::ServiceManager::new( &self.args.id, &self.args.publish_binary, &self.args.address,