runtime-rs: fix possibe bug in ServiceManager::run()

Multiple instances of task service may get registered by
ServiceManager::run(), fix it by making operation symmetric.

Fixes: #7479

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-07-28 14:14:12 +08:00
parent 1a5f90dc3f
commit 1cc1c81c9a
2 changed files with 21 additions and 15 deletions

View File

@ -18,6 +18,7 @@ use containerd_shim_protos::{
shim_async, shim_async,
}; };
use runtimes::RuntimeHandlerManager; use runtimes::RuntimeHandlerManager;
use shim_interface::KATA_PATH;
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
process::Command, process::Command,
@ -26,9 +27,9 @@ use tokio::{
use ttrpc::asynchronous::Server; use ttrpc::asynchronous::Server;
use crate::task_service::TaskService; use crate::task_service::TaskService;
/// message buffer size /// message buffer size
const MESSAGE_BUFFER_SIZE: usize = 8; const MESSAGE_BUFFER_SIZE: usize = 8;
use shim_interface::KATA_PATH;
pub struct ServiceManager { pub struct ServiceManager {
receiver: Option<Receiver<Message>>, receiver: Option<Receiver<Message>>,
@ -103,8 +104,7 @@ impl ServiceManager {
task_server_fd: RawFd, task_server_fd: RawFd,
) -> Result<Self> { ) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let rt_mgr = RuntimeHandlerManager::new(id, sender) let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?;
.context("new runtime handler")?;
let handler = Arc::new(rt_mgr); let handler = Arc::new(rt_mgr);
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
task_server = task_server.set_domain_unix(); task_server = task_server.set_domain_unix();
@ -118,9 +118,10 @@ impl ServiceManager {
}) })
} }
pub async fn run(&mut self) -> Result<()> { pub async fn run(mut self) -> Result<()> {
info!(sl!(), "begin to run service"); info!(sl!(), "begin to run service");
self.start().await.context("start service")?; self.registry_service().context("registry service")?;
self.start_service().await.context("start service")?;
info!(sl!(), "wait server message"); info!(sl!(), "wait server message");
let mut rx = self.receiver.take(); let mut rx = self.receiver.take();
@ -128,10 +129,10 @@ impl ServiceManager {
while let Some(r) = rx.recv().await { while let Some(r) = rx.recv().await {
info!(sl!(), "receive action {:?}", &r.action); info!(sl!(), "receive action {:?}", &r.action);
let result = match r.action { let result = match r.action {
Action::Start => self.start().await.context("start listen"), Action::Start => self.start_service().await.context("start listen"),
Action::Stop => self.stop_listen().await.context("stop listen"), Action::Stop => self.stop_service().await.context("stop listen"),
Action::Shutdown => { Action::Shutdown => {
self.stop_listen().await.context("stop listen")?; self.stop_service().await.context("stop listen")?;
break; break;
} }
Action::Event(event) => { Action::Event(event) => {
@ -163,8 +164,7 @@ impl ServiceManager {
pub async fn cleanup(sid: &str) -> Result<()> { pub async fn cleanup(sid: &str) -> Result<()> {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = RuntimeHandlerManager::new(sid, sender) let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?;
.context("new runtime handler")?;
handler.cleanup().await.context("runtime handler cleanup")?; handler.cleanup().await.context("runtime handler cleanup")?;
let temp_dir = [KATA_PATH, sid].join("/"); let temp_dir = [KATA_PATH, sid].join("/");
if std::fs::metadata(temp_dir.as_str()).is_ok() { if std::fs::metadata(temp_dir.as_str()).is_ok() {
@ -179,18 +179,24 @@ impl ServiceManager {
Ok(()) Ok(())
} }
async fn start(&mut self) -> Result<()> { fn registry_service(&mut self) -> Result<()> {
if let Some(t) = self.task_server.take() { if let Some(t) = self.task_server.take() {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>); as Box<dyn shim_async::Task + Send + Sync>);
let mut t = t.register_service(shim_async::create_task(task_service)); let t = t.register_service(shim_async::create_task(task_service));
t.start().await.context("task server start")?;
self.task_server = Some(t); self.task_server = Some(t);
} }
Ok(()) Ok(())
} }
async fn stop_listen(&mut self) -> Result<()> { async fn start_service(&mut self) -> Result<()> {
if let Some(t) = self.task_server.as_mut() {
t.start().await.context("task server start")?;
}
Ok(())
}
async fn stop_service(&mut self) -> Result<()> {
if let Some(t) = self.task_server.as_mut() { if let Some(t) = self.task_server.as_mut() {
t.stop_listen().await; t.stop_listen().await;
} }

View File

@ -46,7 +46,7 @@ impl ShimExecutor {
self.args.validate(false).context("validate")?; self.args.validate(false).context("validate")?;
let server_fd = get_server_fd().context("get server fd")?; let server_fd = get_server_fd().context("get server fd")?;
let mut service_manager = service::ServiceManager::new( let service_manager = service::ServiceManager::new(
&self.args.id, &self.args.id,
&self.args.publish_binary, &self.args.publish_binary,
&self.args.address, &self.args.address,