mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-22 05:28:25 +00:00
runtime-rs: fix possibe bug in ServiceManager::run()
Multiple instances of task service may get registered by ServiceManager::run(), fix it by making operation symmetric. Fixes: #7479 Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
parent
1a5f90dc3f
commit
1cc1c81c9a
@ -18,6 +18,7 @@ use containerd_shim_protos::{
|
|||||||
shim_async,
|
shim_async,
|
||||||
};
|
};
|
||||||
use runtimes::RuntimeHandlerManager;
|
use runtimes::RuntimeHandlerManager;
|
||||||
|
use shim_interface::KATA_PATH;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::AsyncWriteExt,
|
io::AsyncWriteExt,
|
||||||
process::Command,
|
process::Command,
|
||||||
@ -26,9 +27,9 @@ use tokio::{
|
|||||||
use ttrpc::asynchronous::Server;
|
use ttrpc::asynchronous::Server;
|
||||||
|
|
||||||
use crate::task_service::TaskService;
|
use crate::task_service::TaskService;
|
||||||
|
|
||||||
/// message buffer size
|
/// message buffer size
|
||||||
const MESSAGE_BUFFER_SIZE: usize = 8;
|
const MESSAGE_BUFFER_SIZE: usize = 8;
|
||||||
use shim_interface::KATA_PATH;
|
|
||||||
|
|
||||||
pub struct ServiceManager {
|
pub struct ServiceManager {
|
||||||
receiver: Option<Receiver<Message>>,
|
receiver: Option<Receiver<Message>>,
|
||||||
@ -103,8 +104,7 @@ impl ServiceManager {
|
|||||||
task_server_fd: RawFd,
|
task_server_fd: RawFd,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||||
let rt_mgr = RuntimeHandlerManager::new(id, sender)
|
let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?;
|
||||||
.context("new runtime handler")?;
|
|
||||||
let handler = Arc::new(rt_mgr);
|
let handler = Arc::new(rt_mgr);
|
||||||
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
||||||
task_server = task_server.set_domain_unix();
|
task_server = task_server.set_domain_unix();
|
||||||
@ -118,9 +118,10 @@ impl ServiceManager {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self) -> Result<()> {
|
pub async fn run(mut self) -> Result<()> {
|
||||||
info!(sl!(), "begin to run service");
|
info!(sl!(), "begin to run service");
|
||||||
self.start().await.context("start service")?;
|
self.registry_service().context("registry service")?;
|
||||||
|
self.start_service().await.context("start service")?;
|
||||||
|
|
||||||
info!(sl!(), "wait server message");
|
info!(sl!(), "wait server message");
|
||||||
let mut rx = self.receiver.take();
|
let mut rx = self.receiver.take();
|
||||||
@ -128,10 +129,10 @@ impl ServiceManager {
|
|||||||
while let Some(r) = rx.recv().await {
|
while let Some(r) = rx.recv().await {
|
||||||
info!(sl!(), "receive action {:?}", &r.action);
|
info!(sl!(), "receive action {:?}", &r.action);
|
||||||
let result = match r.action {
|
let result = match r.action {
|
||||||
Action::Start => self.start().await.context("start listen"),
|
Action::Start => self.start_service().await.context("start listen"),
|
||||||
Action::Stop => self.stop_listen().await.context("stop listen"),
|
Action::Stop => self.stop_service().await.context("stop listen"),
|
||||||
Action::Shutdown => {
|
Action::Shutdown => {
|
||||||
self.stop_listen().await.context("stop listen")?;
|
self.stop_service().await.context("stop listen")?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Action::Event(event) => {
|
Action::Event(event) => {
|
||||||
@ -163,8 +164,7 @@ impl ServiceManager {
|
|||||||
|
|
||||||
pub async fn cleanup(sid: &str) -> Result<()> {
|
pub async fn cleanup(sid: &str) -> Result<()> {
|
||||||
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||||
let handler = RuntimeHandlerManager::new(sid, sender)
|
let handler = RuntimeHandlerManager::new(sid, sender).context("new runtime handler")?;
|
||||||
.context("new runtime handler")?;
|
|
||||||
handler.cleanup().await.context("runtime handler cleanup")?;
|
handler.cleanup().await.context("runtime handler cleanup")?;
|
||||||
let temp_dir = [KATA_PATH, sid].join("/");
|
let temp_dir = [KATA_PATH, sid].join("/");
|
||||||
if std::fs::metadata(temp_dir.as_str()).is_ok() {
|
if std::fs::metadata(temp_dir.as_str()).is_ok() {
|
||||||
@ -179,18 +179,24 @@ impl ServiceManager {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start(&mut self) -> Result<()> {
|
fn registry_service(&mut self) -> Result<()> {
|
||||||
if let Some(t) = self.task_server.take() {
|
if let Some(t) = self.task_server.take() {
|
||||||
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
||||||
as Box<dyn shim_async::Task + Send + Sync>);
|
as Box<dyn shim_async::Task + Send + Sync>);
|
||||||
let mut t = t.register_service(shim_async::create_task(task_service));
|
let t = t.register_service(shim_async::create_task(task_service));
|
||||||
t.start().await.context("task server start")?;
|
|
||||||
self.task_server = Some(t);
|
self.task_server = Some(t);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stop_listen(&mut self) -> Result<()> {
|
async fn start_service(&mut self) -> Result<()> {
|
||||||
|
if let Some(t) = self.task_server.as_mut() {
|
||||||
|
t.start().await.context("task server start")?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stop_service(&mut self) -> Result<()> {
|
||||||
if let Some(t) = self.task_server.as_mut() {
|
if let Some(t) = self.task_server.as_mut() {
|
||||||
t.stop_listen().await;
|
t.stop_listen().await;
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ impl ShimExecutor {
|
|||||||
self.args.validate(false).context("validate")?;
|
self.args.validate(false).context("validate")?;
|
||||||
|
|
||||||
let server_fd = get_server_fd().context("get server fd")?;
|
let server_fd = get_server_fd().context("get server fd")?;
|
||||||
let mut service_manager = service::ServiceManager::new(
|
let service_manager = service::ServiceManager::new(
|
||||||
&self.args.id,
|
&self.args.id,
|
||||||
&self.args.publish_binary,
|
&self.args.publish_binary,
|
||||||
&self.args.address,
|
&self.args.address,
|
||||||
|
Loading…
Reference in New Issue
Block a user