diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 6b01c3b12e..2b3a7a4b70 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -65,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY use crate::sync_with_async::{read_async, write_async}; use async_trait::async_trait; use rlimit::{setrlimit, Resource, Rlim}; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncBufReadExt; use tokio::sync::Mutex; use kata_sys_util::hooks::HookStates; @@ -1002,38 +1002,12 @@ impl BaseContainer for LinuxContainer { // Copy from stdin to term_master if let Some(mut stdin_stream) = proc_io.stdin.take() { let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; - let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); - let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); let term_closer = term_closer.clone(); tokio::spawn(async move { - let mut buf = [0u8; 8192]; - loop { - tokio::select! { - // Make sure stdin_stream is drained before exiting - biased; - res = stdin_stream.read(&mut buf) => { - match res { - Err(_) | Ok(0) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); - break; - } - Ok(n) => { - if term_master.write_all(&buf[..n]).await.is_err() { - break; - } - } - } - } - // As the stdin fifo is opened in RW mode in the shim, which will never - // read EOF, we close the stdin fifo here when explicit requested. - _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); - break - } - } - } - wgw_input.done(); + let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; + debug!(logger, "copy from stdin to term_master end: {:?}", res); + std::mem::forget(term_master); // Avoid auto closing of term_master drop(term_closer); }); @@ -1070,37 +1044,10 @@ impl BaseContainer for LinuxContainer { if let Some(mut stdin_stream) = proc_io.stdin.take() { debug!(logger, "copy from stdin to parent_stdin"); let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; - let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); - let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); tokio::spawn(async move { - let mut buf = [0u8; 8192]; - loop { - tokio::select! { - // Make sure stdin_stream is drained before exiting - biased; - res = stdin_stream.read(&mut buf) => { - match res { - Err(_) | Ok(0) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); - break; - } - Ok(n) => { - if parent_stdin.write_all(&buf[..n]).await.is_err() { - break; - } - } - } - } - // As the stdin fifo is opened in RW mode in the shim, which will never - // read EOF, we close the stdin fifo here when explicit requested. - _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); - break - } - } - } - wgw_input.done(); + let res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin).await; + debug!(logger, "copy from stdin to term_master end: {:?}", res); }); } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index efc99afd39..853123cbcf 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -54,11 +54,6 @@ pub struct ProcessIo { pub stdin: Option, pub stdout: Option, pub stderr: Option, - // used to close stdin stream - pub close_stdin_tx: tokio::sync::watch::Sender, - pub close_stdin_rx: tokio::sync::watch::Receiver, - // wait for stdin copy task to finish - pub wg_input: WaitGroup, // used to wait for all process outputs to be copied to the vsock streams // only used when tty is used. pub wg_output: WaitGroup, @@ -70,15 +65,10 @@ impl ProcessIo { stdout: Option, stderr: Option, ) -> Self { - let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false); - ProcessIo { stdin, stdout, stderr, - close_stdin_tx, - close_stdin_rx, - wg_input: WaitGroup::new(), wg_output: WaitGroup::new(), } } @@ -210,11 +200,9 @@ impl Process { } pub async fn close_stdin(&mut self) { - if let Some(proc_io) = &mut self.proc_io { - // notify io copy task to close stdin stream - let _ = proc_io.close_stdin_tx.send(true); - // wait for io copy task to finish - proc_io.wg_input.wait().await; + // stdin will be closed automatically in passfd-io senario + if self.proc_io.is_some() { + return; } close_process_stream!(self, term_master, TermMaster);