mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-22 13:38:26 +00:00
runtime-rs: fix possibe bug in ServiceManager::run()
Multiple instances of task service may get registered by ServiceManager::run(), fix it by making operation symmetric. Fixes: #7479 Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
parent
1a5f90dc3f
commit
1cc1c81c9a
@ -18,6 +18,7 @@ use containerd_shim_protos::{
|
||||
shim_async,
|
||||
};
|
||||
use runtimes::RuntimeHandlerManager;
|
||||
use shim_interface::KATA_PATH;
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
process::Command,
|
||||
@ -26,9 +27,9 @@ use tokio::{
|
||||
use ttrpc::asynchronous::Server;
|
||||
|
||||
use crate::task_service::TaskService;
|
||||
|
||||
/// message buffer size
|
||||
const MESSAGE_BUFFER_SIZE: usize = 8;
|
||||
use shim_interface::KATA_PATH;
|
||||
|
||||
pub struct ServiceManager {
|
||||
receiver: Option<Receiver<Message>>,
|
||||
@ -103,8 +104,7 @@ impl ServiceManager {
|
||||
task_server_fd: RawFd,
|
||||
) -> Result<Self> {
|
||||
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||
let rt_mgr = RuntimeHandlerManager::new(id, sender)
|
||||
.context("new runtime handler")?;
|
||||
let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?;
|
||||
let handler = Arc::new(rt_mgr);
|
||||
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
||||
task_server = task_server.set_domain_unix();
|
||||
@ -118,9 +118,10 @@ impl ServiceManager {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<()> {
|
||||
pub async fn run(mut self) -> Result<()> {
|
||||
info!(sl!(), "begin to run service");
|
||||
self.start().await.context("start service")?;
|
||||
self.registry_service().context("registry service")?;
|
||||
self.start_service().await.context("start service")?;
|
||||
|
||||
info!(sl!(), "wait server message");
|
||||
let mut rx = self.receiver.take();
|
||||
@ -128,10 +129,10 @@ impl ServiceManager {
|
||||
while let Some(r) = rx.recv().await {
|
||||
info!(sl!(), "receive action {:?}", &r.action);
|
||||
let result = match r.action {
|
||||
Action::Start => self.start().await.context("start listen"),
|
||||
Action::Stop => self.stop_listen().await.context("stop listen"),
|
||||
Action::Start => self.start_service().await.context("start listen"),
|
||||
Action::Stop => self.stop_service().await.context("stop listen"),
|
||||
Action::Shutdown => {
|
||||
self.stop_listen().await.context("stop listen")?;
|
||||
self.stop_service().await.context("stop listen")?;
|
||||
break;
|
||||
}
|
||||
Action::Event(event) => {
|
||||
@ -163,8 +164,7 @@ impl ServiceManager {
|
||||
|
||||
pub async fn cleanup(sid: &str) -> Result<()> {
|
||||
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||
let handler = RuntimeHandlerManager::new(sid, sender)
|
||||
.context("new runtime handler")?;
|
||||
let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?;
|
||||
handler.cleanup().await.context("runtime handler cleanup")?;
|
||||
let temp_dir = [KATA_PATH, sid].join("/");
|
||||
if std::fs::metadata(temp_dir.as_str()).is_ok() {
|
||||
@ -179,18 +179,24 @@ impl ServiceManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start(&mut self) -> Result<()> {
|
||||
fn registry_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.task_server.take() {
|
||||
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
||||
as Box<dyn shim_async::Task + Send + Sync>);
|
||||
let mut t = t.register_service(shim_async::create_task(task_service));
|
||||
t.start().await.context("task server start")?;
|
||||
let t = t.register_service(shim_async::create_task(task_service));
|
||||
self.task_server = Some(t);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_listen(&mut self) -> Result<()> {
|
||||
async fn start_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.task_server.as_mut() {
|
||||
t.start().await.context("task server start")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.task_server.as_mut() {
|
||||
t.stop_listen().await;
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ impl ShimExecutor {
|
||||
self.args.validate(false).context("validate")?;
|
||||
|
||||
let server_fd = get_server_fd().context("get server fd")?;
|
||||
let mut service_manager = service::ServiceManager::new(
|
||||
let service_manager = service::ServiceManager::new(
|
||||
&self.args.id,
|
||||
&self.args.publish_binary,
|
||||
&self.args.address,
|
||||
|
Loading…
Reference in New Issue
Block a user