agent: disconnect rpc get_oom_event when destroy_sandbox.

Otherwise it would block the shutdown of ttrpc.

Fixes: #1524

Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
Tim Zhang 2021-03-18 16:00:25 +08:00
parent 50f317dcff
commit 8c4d3346d4
2 changed files with 25 additions and 11 deletions

View File

@ -1040,6 +1040,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
// destroy all containers, clean up, notify agent to exit // destroy all containers, clean up, notify agent to exit
// etc. // etc.
sandbox.destroy().await.unwrap(); sandbox.destroy().await.unwrap();
// Close get_oom_event connection,
// otherwise it will block the shutdown of ttrpc.
sandbox.event_tx.take();
sandbox.sender.take().unwrap().send(1).unwrap(); sandbox.sender.take().unwrap().send(1).unwrap();
Ok(Empty::new()) Ok(Empty::new())
@ -1188,15 +1192,16 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
drop(s); drop(s);
drop(sandbox); drop(sandbox);
match event_rx.recv().await { if let Some(container_id) = event_rx.recv().await {
None => Err(ttrpc_error(ttrpc::Code::INTERNAL, "")), info!(sl!(), "get_oom_event return {}", &container_id);
Some(container_id) => {
info!(sl!(), "get_oom_event return {}", &container_id); let mut resp = OOMEvent::new();
let mut resp = OOMEvent::new(); resp.container_id = container_id;
resp.container_id = container_id;
Ok(resp) return Ok(resp);
}
} }
Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
} }
} }

View File

@ -47,7 +47,7 @@ pub struct Sandbox {
pub rtnl: Handle, pub rtnl: Handle,
pub hooks: Option<Hooks>, pub hooks: Option<Hooks>,
pub event_rx: Arc<Mutex<Receiver<String>>>, pub event_rx: Arc<Mutex<Receiver<String>>>,
pub event_tx: Sender<String>, pub event_tx: Option<Sender<String>>,
} }
impl Sandbox { impl Sandbox {
@ -76,7 +76,7 @@ impl Sandbox {
rtnl: Handle::new()?, rtnl: Handle::new()?,
hooks: None, hooks: None,
event_rx, event_rx,
event_tx: tx, event_tx: Some(tx),
}) })
} }
@ -311,9 +311,18 @@ impl Sandbox {
} }
pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) { pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) {
let tx = self.event_tx.clone();
let logger = self.logger.clone(); let logger = self.logger.clone();
if self.event_tx.is_none() {
error!(
logger,
"sandbox.event_tx not found in run_oom_event_monitor"
);
return;
}
let tx = self.event_tx.as_ref().unwrap().clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let event = rx.recv().await; let event = rx.recv().await;