agent: Use pipes as stdout/stderr of container process

Linux forbids opening an existing socket through /proc/<pid>/fd/<fd>,
making some images relying on the special file /dev/stdout(stderr),
/proc/self/fd/1(2) fail to boot in passfd io mode, where the
stdout/stderr of a container process is a vsock socket.

For back compatibility, a pipe is introduced between the process
and the socket, and its read end is set as stdout/stderr of the
container process instead of the socket. The agent will do the
forwarding between the pipe and the socket.

Fixes: #6714
Signed-off-by: Zixuan Tan <tanzixuan.me@gmail.com>
This commit is contained in:
Zixuan Tan 2024-01-17 23:00:44 +08:00
parent f6710610d1
commit 1206de2c23
4 changed files with 48 additions and 33 deletions

2
src/agent/Cargo.lock generated
View File

@ -2526,7 +2526,7 @@ dependencies = [
"tempfile", "tempfile",
"test-utils", "test-utils",
"tokio", "tokio",
"tokio-vsock", "tokio-vsock 0.3.1",
"xattr", "xattr",
"zbus", "zbus",
] ]

View File

@ -1064,7 +1064,7 @@ impl BaseContainer for LinuxContainer {
// This is because we need to close the stdin fifo when the stdin stream // This is because we need to close the stdin fifo when the stdin stream
// is drained. // is drained.
if let Some(mut stdin_stream) = proc_io.stdin.take() { if let Some(mut stdin_stream) = proc_io.stdin.take() {
info!(logger, "copy from stdin to parent_stdin"); debug!(logger, "copy from stdin to parent_stdin");
let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker(); let wgw_input = proc_io.wg_input.worker();
@ -1078,7 +1078,7 @@ impl BaseContainer for LinuxContainer {
res = stdin_stream.read(&mut buf) => { res = stdin_stream.read(&mut buf) => {
match res { match res {
Err(_) | Ok(0) => { Err(_) | Ok(0) => {
info!(logger, "copy from stdin to term_master end: {:?}", res); debug!(logger, "copy from stdin to term_master end: {:?}", res);
break; break;
} }
Ok(n) => { Ok(n) => {
@ -1091,7 +1091,7 @@ impl BaseContainer for LinuxContainer {
// As the stdin fifo is opened in RW mode in the shim, which will never // As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested. // read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => { _ = close_stdin_rx.changed() => {
info!(logger, "copy ends as requested"); debug!(logger, "copy ends as requested");
break break
} }
} }
@ -1099,6 +1099,38 @@ impl BaseContainer for LinuxContainer {
wgw_input.done(); wgw_input.done();
}); });
} }
// copy from parent_stdout to stdout stream
if let Some(mut stdout_stream) = proc_io.stdout.take() {
debug!(logger, "copy from parent_stdout to stdout stream");
let wgw_output = proc_io.wg_output.worker();
let mut parent_stdout = unsafe { File::from_raw_fd(p.parent_stdout.unwrap()) };
let logger = logger.clone();
tokio::spawn(async move {
let res = tokio::io::copy(&mut parent_stdout, &mut stdout_stream).await;
debug!(
logger,
"copy from parent_stdout to stdout stream end: {:?}", res
);
wgw_output.done();
});
}
// copy from parent_stderr to stderr stream
if let Some(mut stderr_stream) = proc_io.stderr.take() {
debug!(logger, "copy from parent_stderr to stderr stream");
let wgw_output = proc_io.wg_output.worker();
let mut parent_stderr = unsafe { File::from_raw_fd(p.parent_stderr.unwrap()) };
let logger = logger.clone();
tokio::spawn(async move {
let res = tokio::io::copy(&mut parent_stderr, &mut stderr_stream).await;
debug!(
logger,
"copy from parent_stderr to stderr stream end: {:?}", res
);
wgw_output.done();
});
}
} }
} }

View File

@ -5,7 +5,7 @@
use libc::pid_t; use libc::pid_t;
use std::fs::File; use std::fs::File;
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio_vsock::VsockStream; use tokio_vsock::VsockStream;
@ -137,13 +137,6 @@ impl ProcessOperations for Process {
} }
} }
fn set_blocking(fd: RawFd) -> Result<()> {
let flags = fcntl(fd, FcntlArg::F_GETFL)?;
let new_flags = !OFlag::O_NONBLOCK & OFlag::from_bits_truncate(flags);
fcntl(fd, FcntlArg::F_SETFL(new_flags))?;
Ok(())
}
impl Process { impl Process {
pub fn new( pub fn new(
logger: &Logger, logger: &Logger,
@ -195,29 +188,19 @@ impl Process {
p.parent_stdin = Some(pstdin); p.parent_stdin = Some(pstdin);
p.stdin = Some(stdin); p.stdin = Some(stdin);
if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) { // These pipes are necessary as the stdout/stderr of the child process
let fd = stdout.into_raw_fd(); // cannot be a socket. Otherwise, some images relying on the /dev/stdout(stderr)
// The stdout/stderr of the process should be blocking, otherwise // and /proc/self/fd/1(2) will fail to boot as opening an existing socket
// the process may encounter EAGAIN error when writing to stdout/stderr. // is forbidden by the Linux kernel.
set_blocking(fd)?;
p.stdout = Some(fd);
} else {
let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?;
p.parent_stdout = Some(pstdout); p.parent_stdout = Some(pstdout);
p.stdout = Some(stdout); p.stdout = Some(stdout);
}
if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) {
let fd = stderr.into_raw_fd();
set_blocking(fd)?;
p.stderr = Some(fd);
} else {
let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?;
p.parent_stderr = Some(pstderr); p.parent_stderr = Some(pstderr);
p.stderr = Some(stderr); p.stderr = Some(stderr);
} }
} }
}
Ok(p) Ok(p)
} }

View File

@ -69,7 +69,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
} }
}; };
// In passfd io mode, when using tty, we need to wait for the copy task end. // In passfd io mode, we need to wait for the copy task end.
if let Some(proc_io) = &mut p.proc_io { if let Some(proc_io) = &mut p.proc_io {
proc_io.wg_output.wait().await; proc_io.wg_output.wait().await;
} }