agent: Fix exec hang issues with a backgroud process

Issue #4747 and pull request #4748 fix exec hang issues where the exec
command hangs when a process's stdout is not closed. However, the PR might
cause the exec command not to work as expected, leading to CI failure. The
PR was reverted in #7042. This PR resolves the exec hang issues and has
undergone 1000 rounds of testing to verify that it would not cause any CI
failures.

Fixes: #4747

Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
Signed-off-by: Xuewei Niu <niuxuewei.nxw@antgroup.com>
This commit is contained in:
Xuewei Niu 2023-07-16 08:32:45 +08:00
parent f6a51a8a78
commit 6c91af0a26
2 changed files with 13 additions and 6 deletions

View File

@ -161,7 +161,7 @@ impl Process {
pub fn notify_term_close(&mut self) { pub fn notify_term_close(&mut self) {
let notify = self.term_exit_notifier.clone(); let notify = self.term_exit_notifier.clone();
notify.notify_one(); notify.notify_waiters();
} }
pub fn close_stdin(&mut self) { pub fn close_stdin(&mut self) {

View File

@ -595,15 +595,16 @@ impl AgentService {
let cid = req.container_id; let cid = req.container_id;
let eid = req.exec_id; let eid = req.exec_id;
let mut term_exit_notifier = Arc::new(tokio::sync::Notify::new()); let term_exit_notifier;
let reader = { let reader = {
let s = self.sandbox.clone(); let s = self.sandbox.clone();
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
if p.term_master.is_some() {
term_exit_notifier = p.term_exit_notifier.clone(); term_exit_notifier = p.term_exit_notifier.clone();
if p.term_master.is_some() {
p.get_reader(StreamType::TermMaster) p.get_reader(StreamType::TermMaster)
} else if stdout { } else if stdout {
if p.parent_stdout.is_some() { if p.parent_stdout.is_some() {
@ -623,9 +624,12 @@ impl AgentService {
let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?; let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?;
tokio::select! { tokio::select! {
_ = term_exit_notifier.notified() => { // Poll the futures in the order they appear from top to bottom
Err(anyhow!("eof")) // it is very important to avoid data loss. If there is still
} // data in the buffer and read_stream branch will return
// Poll::Ready so that the term_exit_notifier will never polled
// before all data were read.
biased;
v = read_stream(reader, req.len as usize) => { v = read_stream(reader, req.len as usize) => {
let vector = v?; let vector = v?;
let mut resp = ReadStreamResponse::new(); let mut resp = ReadStreamResponse::new();
@ -633,6 +637,9 @@ impl AgentService {
Ok(resp) Ok(resp)
} }
_ = term_exit_notifier.notified() => {
Err(anyhow!("eof"))
}
} }
} }
} }