runtime-rs: simplify implementation of service crate

Simplify implementation of service crate.

Fixes: #7479

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-07-28 10:48:11 +08:00
parent 61a8eabf8e
commit 1a5f90dc3f
3 changed files with 18 additions and 32 deletions

View File

@ -221,7 +221,7 @@ impl std::fmt::Debug for RuntimeHandlerManager {
} }
impl RuntimeHandlerManager { impl RuntimeHandlerManager {
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> { pub fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
Ok(Self { Ok(Self {
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new( inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
id, msg_sender, id, msg_sender,

View File

@ -94,6 +94,7 @@ async fn send_event(
} }
impl ServiceManager { impl ServiceManager {
// TODO: who manages lifecycle for `task_server_fd`?
pub async fn new( pub async fn new(
id: &str, id: &str,
containerd_binary: &str, containerd_binary: &str,
@ -102,11 +103,9 @@ impl ServiceManager {
task_server_fd: RawFd, task_server_fd: RawFd,
) -> Result<Self> { ) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = Arc::new( let rt_mgr = RuntimeHandlerManager::new(id, sender)
RuntimeHandlerManager::new(id, sender) .context("new runtime handler")?;
.await let handler = Arc::new(rt_mgr);
.context("new runtime handler")?,
);
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
task_server = task_server.set_domain_unix(); task_server = task_server.set_domain_unix();
Ok(Self { Ok(Self {
@ -121,7 +120,7 @@ impl ServiceManager {
pub async fn run(&mut self) -> Result<()> { pub async fn run(&mut self) -> Result<()> {
info!(sl!(), "begin to run service"); info!(sl!(), "begin to run service");
self.start().await.context("start")?; self.start().await.context("start service")?;
info!(sl!(), "wait server message"); info!(sl!(), "wait server message");
let mut rx = self.receiver.take(); let mut rx = self.receiver.take();
@ -144,8 +143,7 @@ impl ServiceManager {
event, event,
) )
.await .await
.context("send event")?; .context("send event")
Ok(())
} }
}; };
@ -166,7 +164,6 @@ impl ServiceManager {
pub async fn cleanup(sid: &str) -> Result<()> { pub async fn cleanup(sid: &str) -> Result<()> {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = RuntimeHandlerManager::new(sid, sender) let handler = RuntimeHandlerManager::new(sid, sender)
.await
.context("new runtime handler")?; .context("new runtime handler")?;
handler.cleanup().await.context("runtime handler cleanup")?; handler.cleanup().await.context("runtime handler cleanup")?;
let temp_dir = [KATA_PATH, sid].join("/"); let temp_dir = [KATA_PATH, sid].join("/");
@ -183,31 +180,20 @@ impl ServiceManager {
} }
async fn start(&mut self) -> Result<()> { async fn start(&mut self) -> Result<()> {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) if let Some(t) = self.task_server.take() {
as Box<dyn shim_async::Task + Send + Sync>); let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
let task_server = self.task_server.take(); as Box<dyn shim_async::Task + Send + Sync>);
let task_server = match task_server { let mut t = t.register_service(shim_async::create_task(task_service));
Some(t) => { t.start().await.context("task server start")?;
let mut t = t.register_service(shim_async::create_task(task_service)); self.task_server = Some(t);
t.start().await.context("task server start")?; }
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(()) Ok(())
} }
async fn stop_listen(&mut self) -> Result<()> { async fn stop_listen(&mut self) -> Result<()> {
let task_server = self.task_server.take(); if let Some(t) = self.task_server.as_mut() {
let task_server = match task_server { t.stop_listen().await;
Some(mut t) => { }
t.stop_listen().await;
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(()) Ok(())
} }
} }

View File

@ -16,7 +16,7 @@ const WORKER_THREADS: usize = 2;
async fn real_main() { async fn real_main() {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let manager = RuntimeHandlerManager::new("xxx", sender).await.unwrap(); let manager = RuntimeHandlerManager::new("xxx", sender).unwrap();
let req = Request::CreateContainer(ContainerConfig { let req = Request::CreateContainer(ContainerConfig {
container_id: "xxx".to_owned(), container_id: "xxx".to_owned(),