diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 0e7fe73efd..cdecae1308 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -161,7 +161,7 @@ impl Process { pub fn notify_term_close(&mut self) { let notify = self.term_exit_notifier.clone(); - notify.notify_one(); + notify.notify_waiters(); } pub fn close_stdin(&mut self) { diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 988cce5b76..6c92eb774e 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -595,15 +595,16 @@ impl AgentService { let cid = req.container_id; let eid = req.exec_id; - let mut term_exit_notifier = Arc::new(tokio::sync::Notify::new()); + let term_exit_notifier; let reader = { let s = self.sandbox.clone(); let mut sandbox = s.lock().await; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; + term_exit_notifier = p.term_exit_notifier.clone(); + if p.term_master.is_some() { - term_exit_notifier = p.term_exit_notifier.clone(); p.get_reader(StreamType::TermMaster) } else if stdout { if p.parent_stdout.is_some() { @@ -623,9 +624,12 @@ impl AgentService { let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?; tokio::select! { - _ = term_exit_notifier.notified() => { - Err(anyhow!("eof")) - } + // Poll the futures in the order they appear from top to bottom + // it is very important to avoid data loss. If there is still + // data in the buffer and read_stream branch will return + // Poll::Ready so that the term_exit_notifier will never polled + // before all data were read. + biased; v = read_stream(reader, req.len as usize) => { let vector = v?; let mut resp = ReadStreamResponse::new(); @@ -633,6 +637,9 @@ impl AgentService { Ok(resp) } + _ = term_exit_notifier.notified() => { + Err(anyhow!("eof")) + } } } }