agent: Fix close_stdin for passfd-io

In scenario passfd-io, we should wait for stdin to close itself
instead of manually intervening in it.

Signed-off-by: Tim Zhang <tim@hyper.sh>
Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
Tim Zhang
2024-04-16 20:33:06 +08:00
parent 221c5b51fe
commit d68eb7f0ad
2 changed files with 9 additions and 74 deletions

View File

@@ -65,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY
use crate::sync_with_async::{read_async, write_async}; use crate::sync_with_async::{read_async, write_async};
use async_trait::async_trait; use async_trait::async_trait;
use rlimit::{setrlimit, Resource, Rlim}; use rlimit::{setrlimit, Resource, Rlim};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; use tokio::io::AsyncBufReadExt;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use kata_sys_util::hooks::HookStates; use kata_sys_util::hooks::HookStates;
@@ -1002,38 +1002,12 @@ impl BaseContainer for LinuxContainer {
// Copy from stdin to term_master // Copy from stdin to term_master
if let Some(mut stdin_stream) = proc_io.stdin.take() { if let Some(mut stdin_stream) = proc_io.stdin.take() {
let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let mut term_master = unsafe { File::from_raw_fd(pseudo.master) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker();
let logger = logger.clone(); let logger = logger.clone();
let term_closer = term_closer.clone(); let term_closer = term_closer.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = [0u8; 8192]; let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await;
loop { debug!(logger, "copy from stdin to term_master end: {:?}", res);
tokio::select! {
// Make sure stdin_stream is drained before exiting
biased;
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
break;
}
Ok(n) => {
if term_master.write_all(&buf[..n]).await.is_err() {
break;
}
}
}
}
// As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
break
}
}
}
wgw_input.done();
std::mem::forget(term_master); // Avoid auto closing of term_master std::mem::forget(term_master); // Avoid auto closing of term_master
drop(term_closer); drop(term_closer);
}); });
@@ -1070,37 +1044,10 @@ impl BaseContainer for LinuxContainer {
if let Some(mut stdin_stream) = proc_io.stdin.take() { if let Some(mut stdin_stream) = proc_io.stdin.take() {
debug!(logger, "copy from stdin to parent_stdin"); debug!(logger, "copy from stdin to parent_stdin");
let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker();
let logger = logger.clone(); let logger = logger.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = [0u8; 8192]; let res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin).await;
loop { debug!(logger, "copy from stdin to term_master end: {:?}", res);
tokio::select! {
// Make sure stdin_stream is drained before exiting
biased;
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
break;
}
Ok(n) => {
if parent_stdin.write_all(&buf[..n]).await.is_err() {
break;
}
}
}
}
// As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
break
}
}
}
wgw_input.done();
}); });
} }

View File

@@ -54,11 +54,6 @@ pub struct ProcessIo {
pub stdin: Option<VsockStream>, pub stdin: Option<VsockStream>,
pub stdout: Option<VsockStream>, pub stdout: Option<VsockStream>,
pub stderr: Option<VsockStream>, pub stderr: Option<VsockStream>,
// used to close stdin stream
pub close_stdin_tx: tokio::sync::watch::Sender<bool>,
pub close_stdin_rx: tokio::sync::watch::Receiver<bool>,
// wait for stdin copy task to finish
pub wg_input: WaitGroup,
// used to wait for all process outputs to be copied to the vsock streams // used to wait for all process outputs to be copied to the vsock streams
// only used when tty is used. // only used when tty is used.
pub wg_output: WaitGroup, pub wg_output: WaitGroup,
@@ -70,15 +65,10 @@ impl ProcessIo {
stdout: Option<VsockStream>, stdout: Option<VsockStream>,
stderr: Option<VsockStream>, stderr: Option<VsockStream>,
) -> Self { ) -> Self {
let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false);
ProcessIo { ProcessIo {
stdin, stdin,
stdout, stdout,
stderr, stderr,
close_stdin_tx,
close_stdin_rx,
wg_input: WaitGroup::new(),
wg_output: WaitGroup::new(), wg_output: WaitGroup::new(),
} }
} }
@@ -210,11 +200,9 @@ impl Process {
} }
pub async fn close_stdin(&mut self) { pub async fn close_stdin(&mut self) {
if let Some(proc_io) = &mut self.proc_io { // stdin will be closed automatically in passfd-io senario
// notify io copy task to close stdin stream if self.proc_io.is_some() {
let _ = proc_io.close_stdin_tx.send(true); return;
// wait for io copy task to finish
proc_io.wg_input.wait().await;
} }
close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, term_master, TermMaster);