runtime-rs: simplify implementation of service crate

Simplify implementation of service crate.

Fixes: #7479

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-07-28 10:48:11 +08:00
parent 61a8eabf8e
commit 1a5f90dc3f
3 changed files with 18 additions and 32 deletions

View File

@ -221,7 +221,7 @@ impl std::fmt::Debug for RuntimeHandlerManager {
}
impl RuntimeHandlerManager {
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
pub fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
Ok(Self {
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
id, msg_sender,

View File

@ -94,6 +94,7 @@ async fn send_event(
}
impl ServiceManager {
// TODO: who manages lifecycle for `task_server_fd`?
pub async fn new(
id: &str,
containerd_binary: &str,
@ -102,11 +103,9 @@ impl ServiceManager {
task_server_fd: RawFd,
) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = Arc::new(
RuntimeHandlerManager::new(id, sender)
.await
.context("new runtime handler")?,
);
let rt_mgr = RuntimeHandlerManager::new(id, sender)
.context("new runtime handler")?;
let handler = Arc::new(rt_mgr);
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
task_server = task_server.set_domain_unix();
Ok(Self {
@ -121,7 +120,7 @@ impl ServiceManager {
pub async fn run(&mut self) -> Result<()> {
info!(sl!(), "begin to run service");
self.start().await.context("start")?;
self.start().await.context("start service")?;
info!(sl!(), "wait server message");
let mut rx = self.receiver.take();
@ -144,8 +143,7 @@ impl ServiceManager {
event,
)
.await
.context("send event")?;
Ok(())
.context("send event")
}
};
@ -166,7 +164,6 @@ impl ServiceManager {
pub async fn cleanup(sid: &str) -> Result<()> {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = RuntimeHandlerManager::new(sid, sender)
.await
.context("new runtime handler")?;
handler.cleanup().await.context("runtime handler cleanup")?;
let temp_dir = [KATA_PATH, sid].join("/");
@ -183,31 +180,20 @@ impl ServiceManager {
}
async fn start(&mut self) -> Result<()> {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>);
let task_server = self.task_server.take();
let task_server = match task_server {
Some(t) => {
let mut t = t.register_service(shim_async::create_task(task_service));
t.start().await.context("task server start")?;
Some(t)
}
None => None,
};
self.task_server = task_server;
if let Some(t) = self.task_server.take() {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>);
let mut t = t.register_service(shim_async::create_task(task_service));
t.start().await.context("task server start")?;
self.task_server = Some(t);
}
Ok(())
}
async fn stop_listen(&mut self) -> Result<()> {
let task_server = self.task_server.take();
let task_server = match task_server {
Some(mut t) => {
t.stop_listen().await;
Some(t)
}
None => None,
};
self.task_server = task_server;
if let Some(t) = self.task_server.as_mut() {
t.stop_listen().await;
}
Ok(())
}
}

View File

@ -16,7 +16,7 @@ const WORKER_THREADS: usize = 2;
async fn real_main() {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let manager = RuntimeHandlerManager::new("xxx", sender).await.unwrap();
let manager = RuntimeHandlerManager::new("xxx", sender).unwrap();
let req = Request::CreateContainer(ContainerConfig {
container_id: "xxx".to_owned(),