diff --git a/src/agent/rustjail/Cargo.toml b/src/agent/rustjail/Cargo.toml index e8499832b7..d63ae0ff29 100644 --- a/src/agent/rustjail/Cargo.toml +++ b/src/agent/rustjail/Cargo.toml @@ -30,7 +30,7 @@ cgroups = { package = "cgroups-rs", version = "0.3.3" } rlimit = "0.5.3" cfg-if = "0.1.0" -tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] } +tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt", "fs"] } tokio-vsock = "0.3.1" futures = "0.3.17" async-trait = "0.1.31" diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 465667e3fb..4795ffc141 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -1015,7 +1015,7 @@ impl BaseContainer for LinuxContainer { break; } Ok(n) => { - if let Err(_) = term_master.write_all(&buf[..n]).await { + if term_master.write_all(&buf[..n]).await.is_err() { break; } } @@ -1035,11 +1035,12 @@ impl BaseContainer for LinuxContainer { }); } + // Copy from term_master to stdout if let Some(mut stdout_stream) = proc_io.stdout.take() { let wgw_output = proc_io.wg_output.worker(); let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let logger = logger.clone(); - let term_closer = term_closer.clone(); + let term_closer = term_closer; tokio::spawn(async move { let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await; debug!(logger, "copy from term_master to stdout end: {:?}", res); @@ -1050,6 +1051,7 @@ impl BaseContainer for LinuxContainer { } } } else { + // not using a terminal let stdin = p.stdin.unwrap(); let stdout = p.stdout.unwrap(); let stderr = p.stderr.unwrap(); @@ -1058,8 +1060,11 @@ impl BaseContainer for LinuxContainer { child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; if let Some(proc_io) = &mut p.proc_io { - // Copy from stdin to parent_stdin + // Here we copy from vsock stdin stream to parent_stdin manually. + // This is because we need to close the stdin fifo when the stdin stream + // is drained. if let Some(mut stdin_stream) = proc_io.stdin.take() { + info!(logger, "copy from stdin to parent_stdin"); let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); let wgw_input = proc_io.wg_input.worker(); @@ -1073,11 +1078,11 @@ impl BaseContainer for LinuxContainer { res = stdin_stream.read(&mut buf) => { match res { Err(_) | Ok(0) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); + info!(logger, "copy from stdin to term_master end: {:?}", res); break; } Ok(n) => { - if let Err(_) = parent_stdin.write_all(&buf[..n]).await { + if parent_stdin.write_all(&buf[..n]).await.is_err() { break; } } @@ -1086,7 +1091,7 @@ impl BaseContainer for LinuxContainer { // As the stdin fifo is opened in RW mode in the shim, which will never // read EOF, we close the stdin fifo here when explicit requested. _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); + info!(logger, "copy ends as requested"); break } } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 7b99820c5c..e865b7c9dc 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -5,7 +5,7 @@ use libc::pid_t; use std::fs::File; -use std::os::unix::io::{AsRawFd, RawFd, IntoRawFd}; +use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; use tokio::sync::mpsc::Sender; use tokio_vsock::VsockStream; @@ -195,7 +195,7 @@ impl Process { p.parent_stdin = Some(pstdin); p.stdin = Some(stdin); - if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() { + if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) { let fd = stdout.into_raw_fd(); // The stdout/stderr of the process should be blocking, otherwise // the process may encounter EAGAIN error when writing to stdout/stderr. @@ -207,7 +207,7 @@ impl Process { p.stdout = Some(stdout); } - if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() { + if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) { let fd = stderr.into_raw_fd(); set_blocking(fd)?; p.stderr = Some(fd); @@ -241,8 +241,8 @@ impl Process { } pub fn cleanup_process_stream(&mut self) { - if let Some(_) = self.proc_io.take() { - // taken + if let Some(proc_io) = self.proc_io.take() { + drop(proc_io); return; } diff --git a/src/agent/src/passfd_io.rs b/src/agent/src/passfd_io.rs index 314f66abfe..3175572e90 100644 --- a/src/agent/src/passfd_io.rs +++ b/src/agent/src/passfd_io.rs @@ -22,12 +22,10 @@ pub(crate) async fn start_listen(port: u32) -> Result<()> { let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?; tokio::spawn(async move { loop { - if let Ok((stream, peer_addr)) = listener.accept().await { - if let Vsock(addr) = peer_addr { - let port = addr.port(); - info!(sl(), "accept connection from peer port {}", port); - HVSOCK_STREAMS.lock().await.insert(port, stream); - } + if let Ok((stream, Vsock(addr))) = listener.accept().await { + let port = addr.port(); + info!(sl(), "accept connection from peer port {}", port); + HVSOCK_STREAMS.lock().await.insert(port, stream); } } }); diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index 5b570fb277..0cfc88409a 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -210,20 +210,17 @@ impl Container { .init_process .passfd_io .as_ref() - .map(|io| io.stdin_port) - .flatten(), + .and_then(|io| io.stdin_port), stdout_port: inner .init_process .passfd_io .as_ref() - .map(|io| io.stdout_port) - .flatten(), + .and_then(|io| io.stdout_port), stderr_port: inner .init_process .passfd_io .as_ref() - .map(|io| io.stderr_port) - .flatten(), + .and_then(|io| io.stderr_port), ..Default::default() }; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 1433f64475..1b6fcb2aa5 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -87,24 +87,17 @@ impl ContainerInner { process_id: process.clone().into(), string_user: None, process: Some(exec.oci_process.clone()), - stdin_port: exec - .process - .passfd_io - .as_ref() - .map(|io| io.stdin_port) - .flatten(), + stdin_port: exec.process.passfd_io.as_ref().and_then(|io| io.stdin_port), stdout_port: exec .process .passfd_io .as_ref() - .map(|io| io.stdout_port) - .flatten(), + .and_then(|io| io.stdout_port), stderr_port: exec .process .passfd_io .as_ref() - .map(|io| io.stderr_port) - .flatten(), + .and_then(|io| io.stderr_port), }) .await .context("exec process")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 29d2fe5923..6f360300a7 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -85,7 +85,7 @@ impl PassfdIo { .read(true) .write(true) .custom_flags(libc::O_NONBLOCK) - .open(&stdin) + .open(stdin) .context("open stdin")?; let hostport = passfd_connect(uds_path, passfd_port, fin.into()) @@ -99,7 +99,7 @@ impl PassfdIo { let fout = OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) - .open(&stdout) + .open(stdout) .context("open stdout")?; let hostport = passfd_connect(uds_path, passfd_port, fout.into()) @@ -115,7 +115,7 @@ impl PassfdIo { let ferr = OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) - .open(&stderr) + .open(stderr) .context("open stderr")?; let hostport = passfd_connect(uds_path, passfd_port, ferr.into()) diff --git a/src/tools/runk/libcontainer/src/container.rs b/src/tools/runk/libcontainer/src/container.rs index c7c3e068be..5a2bb0a6c8 100644 --- a/src/tools/runk/libcontainer/src/container.rs +++ b/src/tools/runk/libcontainer/src/container.rs @@ -294,6 +294,7 @@ impl ContainerLauncher { &self.id, self.init, 0, + None, )?) } else { Err(anyhow!("no process configuration"))