diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 1f5da937b5..ffe7de4038 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -2526,7 +2526,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", - "tokio-vsock", + "tokio-vsock 0.3.1", "xattr", "zbus", ] diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 4795ffc141..6a5434e669 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -1064,7 +1064,7 @@ impl BaseContainer for LinuxContainer { // This is because we need to close the stdin fifo when the stdin stream // is drained. if let Some(mut stdin_stream) = proc_io.stdin.take() { - info!(logger, "copy from stdin to parent_stdin"); + debug!(logger, "copy from stdin to parent_stdin"); let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); let wgw_input = proc_io.wg_input.worker(); @@ -1078,7 +1078,7 @@ impl BaseContainer for LinuxContainer { res = stdin_stream.read(&mut buf) => { match res { Err(_) | Ok(0) => { - info!(logger, "copy from stdin to term_master end: {:?}", res); + debug!(logger, "copy from stdin to term_master end: {:?}", res); break; } Ok(n) => { @@ -1091,7 +1091,7 @@ impl BaseContainer for LinuxContainer { // As the stdin fifo is opened in RW mode in the shim, which will never // read EOF, we close the stdin fifo here when explicit requested. _ = close_stdin_rx.changed() => { - info!(logger, "copy ends as requested"); + debug!(logger, "copy ends as requested"); break } } @@ -1099,6 +1099,38 @@ impl BaseContainer for LinuxContainer { wgw_input.done(); }); } + + // copy from parent_stdout to stdout stream + if let Some(mut stdout_stream) = proc_io.stdout.take() { + debug!(logger, "copy from parent_stdout to stdout stream"); + let wgw_output = proc_io.wg_output.worker(); + let mut parent_stdout = unsafe { File::from_raw_fd(p.parent_stdout.unwrap()) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut parent_stdout, &mut stdout_stream).await; + debug!( + logger, + "copy from parent_stdout to stdout stream end: {:?}", res + ); + wgw_output.done(); + }); + } + + // copy from parent_stderr to stderr stream + if let Some(mut stderr_stream) = proc_io.stderr.take() { + debug!(logger, "copy from parent_stderr to stderr stream"); + let wgw_output = proc_io.wg_output.worker(); + let mut parent_stderr = unsafe { File::from_raw_fd(p.parent_stderr.unwrap()) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut parent_stderr, &mut stderr_stream).await; + debug!( + logger, + "copy from parent_stderr to stderr stream end: {:?}", res + ); + wgw_output.done(); + }); + } } } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index e865b7c9dc..efc99afd39 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -5,7 +5,7 @@ use libc::pid_t; use std::fs::File; -use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; +use std::os::unix::io::{AsRawFd, RawFd}; use tokio::sync::mpsc::Sender; use tokio_vsock::VsockStream; @@ -137,13 +137,6 @@ impl ProcessOperations for Process { } } -fn set_blocking(fd: RawFd) -> Result<()> { - let flags = fcntl(fd, FcntlArg::F_GETFL)?; - let new_flags = !OFlag::O_NONBLOCK & OFlag::from_bits_truncate(flags); - fcntl(fd, FcntlArg::F_SETFL(new_flags))?; - Ok(()) -} - impl Process { pub fn new( logger: &Logger, @@ -195,27 +188,17 @@ impl Process { p.parent_stdin = Some(pstdin); p.stdin = Some(stdin); - if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) { - let fd = stdout.into_raw_fd(); - // The stdout/stderr of the process should be blocking, otherwise - // the process may encounter EAGAIN error when writing to stdout/stderr. - set_blocking(fd)?; - p.stdout = Some(fd); - } else { - let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; - p.parent_stdout = Some(pstdout); - p.stdout = Some(stdout); - } + // These pipes are necessary as the stdout/stderr of the child process + // cannot be a socket. Otherwise, some images relying on the /dev/stdout(stderr) + // and /proc/self/fd/1(2) will fail to boot as opening an existing socket + // is forbidden by the Linux kernel. + let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; + p.parent_stdout = Some(pstdout); + p.stdout = Some(stdout); - if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) { - let fd = stderr.into_raw_fd(); - set_blocking(fd)?; - p.stderr = Some(fd); - } else { - let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; - p.parent_stderr = Some(pstderr); - p.stderr = Some(stderr); - } + let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; + p.parent_stderr = Some(pstderr); + p.stderr = Some(stderr); } } Ok(p) diff --git a/src/agent/src/signal.rs b/src/agent/src/signal.rs index eafec4bb04..62a2193719 100644 --- a/src/agent/src/signal.rs +++ b/src/agent/src/signal.rs @@ -69,7 +69,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc>) -> Result } }; - // In passfd io mode, when using tty, we need to wait for the copy task end. + // In passfd io mode, we need to wait for the copy task end. if let Some(proc_io) = &mut p.proc_io { proc_io.wg_output.wait().await; }