mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-22 13:38:26 +00:00
runtime-rs: simplify implementation of service crate
Simplify implementation of service crate. Fixes: #7479 Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
parent
61a8eabf8e
commit
1a5f90dc3f
@ -221,7 +221,7 @@ impl std::fmt::Debug for RuntimeHandlerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RuntimeHandlerManager {
|
impl RuntimeHandlerManager {
|
||||||
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
|
pub fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
|
inner: Arc::new(RwLock::new(RuntimeHandlerManagerInner::new(
|
||||||
id, msg_sender,
|
id, msg_sender,
|
||||||
|
@ -94,6 +94,7 @@ async fn send_event(
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceManager {
|
impl ServiceManager {
|
||||||
|
// TODO: who manages lifecycle for `task_server_fd`?
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
id: &str,
|
id: &str,
|
||||||
containerd_binary: &str,
|
containerd_binary: &str,
|
||||||
@ -102,11 +103,9 @@ impl ServiceManager {
|
|||||||
task_server_fd: RawFd,
|
task_server_fd: RawFd,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||||
let handler = Arc::new(
|
let rt_mgr = RuntimeHandlerManager::new(id, sender)
|
||||||
RuntimeHandlerManager::new(id, sender)
|
.context("new runtime handler")?;
|
||||||
.await
|
let handler = Arc::new(rt_mgr);
|
||||||
.context("new runtime handler")?,
|
|
||||||
);
|
|
||||||
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
||||||
task_server = task_server.set_domain_unix();
|
task_server = task_server.set_domain_unix();
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@ -121,7 +120,7 @@ impl ServiceManager {
|
|||||||
|
|
||||||
pub async fn run(&mut self) -> Result<()> {
|
pub async fn run(&mut self) -> Result<()> {
|
||||||
info!(sl!(), "begin to run service");
|
info!(sl!(), "begin to run service");
|
||||||
self.start().await.context("start")?;
|
self.start().await.context("start service")?;
|
||||||
|
|
||||||
info!(sl!(), "wait server message");
|
info!(sl!(), "wait server message");
|
||||||
let mut rx = self.receiver.take();
|
let mut rx = self.receiver.take();
|
||||||
@ -144,8 +143,7 @@ impl ServiceManager {
|
|||||||
event,
|
event,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("send event")?;
|
.context("send event")
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -166,7 +164,6 @@ impl ServiceManager {
|
|||||||
pub async fn cleanup(sid: &str) -> Result<()> {
|
pub async fn cleanup(sid: &str) -> Result<()> {
|
||||||
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||||
let handler = RuntimeHandlerManager::new(sid, sender)
|
let handler = RuntimeHandlerManager::new(sid, sender)
|
||||||
.await
|
|
||||||
.context("new runtime handler")?;
|
.context("new runtime handler")?;
|
||||||
handler.cleanup().await.context("runtime handler cleanup")?;
|
handler.cleanup().await.context("runtime handler cleanup")?;
|
||||||
let temp_dir = [KATA_PATH, sid].join("/");
|
let temp_dir = [KATA_PATH, sid].join("/");
|
||||||
@ -183,31 +180,20 @@ impl ServiceManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn start(&mut self) -> Result<()> {
|
async fn start(&mut self) -> Result<()> {
|
||||||
|
if let Some(t) = self.task_server.take() {
|
||||||
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
||||||
as Box<dyn shim_async::Task + Send + Sync>);
|
as Box<dyn shim_async::Task + Send + Sync>);
|
||||||
let task_server = self.task_server.take();
|
|
||||||
let task_server = match task_server {
|
|
||||||
Some(t) => {
|
|
||||||
let mut t = t.register_service(shim_async::create_task(task_service));
|
let mut t = t.register_service(shim_async::create_task(task_service));
|
||||||
t.start().await.context("task server start")?;
|
t.start().await.context("task server start")?;
|
||||||
Some(t)
|
self.task_server = Some(t);
|
||||||
}
|
}
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.task_server = task_server;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stop_listen(&mut self) -> Result<()> {
|
async fn stop_listen(&mut self) -> Result<()> {
|
||||||
let task_server = self.task_server.take();
|
if let Some(t) = self.task_server.as_mut() {
|
||||||
let task_server = match task_server {
|
|
||||||
Some(mut t) => {
|
|
||||||
t.stop_listen().await;
|
t.stop_listen().await;
|
||||||
Some(t)
|
|
||||||
}
|
}
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
self.task_server = task_server;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ const WORKER_THREADS: usize = 2;
|
|||||||
|
|
||||||
async fn real_main() {
|
async fn real_main() {
|
||||||
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||||
let manager = RuntimeHandlerManager::new("xxx", sender).await.unwrap();
|
let manager = RuntimeHandlerManager::new("xxx", sender).unwrap();
|
||||||
|
|
||||||
let req = Request::CreateContainer(ContainerConfig {
|
let req = Request::CreateContainer(ContainerConfig {
|
||||||
container_id: "xxx".to_owned(),
|
container_id: "xxx".to_owned(),
|
||||||
|
Loading…
Reference in New Issue
Block a user