diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 169c3b68d0..bd5e97e453 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -20,9 +20,9 @@ async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result { let mut stream = UnixStream::connect(&uds).await.context("connect")?; stream.write_all(b"passfd\n").await.context("write all")?; - // We want the io connection keep connected when the containerd closed the io pipe, - // thus it can be attached on the io stream. - let buf = format!("{} keep", port); + // Since we have already keep stdin_w/stdout_r/stderr_r, "keep" of passfd is no longer needed. + // Also, we can't keep connection here or the stdin would be stuck. + let buf = format!("{}", port); stream .send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()]) .context("send port and fd")?; @@ -78,16 +78,9 @@ impl PassfdIo { passfd_port: u32, terminal: bool, ) -> Result<()> { - // In linux, when a FIFO is opened and there are no writers, the reader - // will continuously receive the HUP event. This can be problematic - // when creating containers in detached mode, as the stdin FIFO writer - // is closed after the container is created, resulting in this situation. - // - // See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll if let Some(stdin) = &self.stdin { let fin = OpenOptions::new() .read(true) - .write(true) .custom_flags(libc::O_NONBLOCK) .open(stdin) .context("open stdin")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs index db3ce99891..f6fdfcb81f 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs @@ -6,21 +6,20 @@ use std::{ io, - os::unix::{ - io::{FromRawFd, RawFd}, - net::UnixStream as StdUnixStream, - prelude::AsRawFd, + os::{ + fd::IntoRawFd, + unix::{ + fs::OpenOptionsExt, + io::{FromRawFd, RawFd}, + net::UnixStream as StdUnixStream, + prelude::AsRawFd, + }, }, pin::Pin, - task::Context as TaskContext, - task::Poll, + task::{Context as TaskContext, Poll}, }; use anyhow::{anyhow, Context, Result}; -use nix::{ - fcntl::{self, OFlag}, - sys::stat::Mode, -}; use tokio::{ fs::OpenOptions, io::{AsyncRead, AsyncWrite}, @@ -28,13 +27,16 @@ use tokio::{ }; use url::Url; -fn open_fifo(path: &str) -> Result { - let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?; - +fn open_fifo_write(path: &str) -> Result { + let std_file = std::fs::OpenOptions::new() + .write(true) + // It's not for non-block openning FIFO but for non-block stream which + // will be add into tokio runtime. + .custom_flags(libc::O_NONBLOCK) + .open(path) + .with_context(|| format!("open {} with write", path))?; + let fd = std_file.into_raw_fd(); let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) }; - std_stream - .set_nonblocking(true) - .context("set nonblocking")?; AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e)) } @@ -67,13 +69,10 @@ impl ShimIo { let stdin_fd: Option> = if let Some(stdin) = stdin { info!(sl!(), "open stdin {:?}", &stdin); - // Since the stdin peer point (which is hold by containerd) could not be openned - // immediately, which would block here's open with block mode, and we wouldn't want to - // block here, thus here opened with nonblock and then reset it to block mode for - // tokio async io. + // Since we had opened the stdin as write mode in the Process::new function, + // thus it wouldn't be blocked to open it as read mode. match OpenOptions::new() .read(true) - .write(false) .custom_flags(libc::O_NONBLOCK) .open(&stdin) .await @@ -118,7 +117,7 @@ impl ShimIo { if let Some(url) = url { if url.scheme() == "fifo" { let path = url.path(); - match open_fifo(path) { + match open_fifo_write(path) { Ok(s) => { return Some(Box::new(ShimIoWrite::Stream(s))); } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 6cc7557702..34e9a9a60b 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -5,7 +5,7 @@ // use std::collections::HashMap; -use std::sync::Arc; +use std::{fs::File, os::unix::fs::OpenOptionsExt, sync::Arc}; use agent::Agent; use anyhow::{Context, Result}; @@ -33,6 +33,26 @@ pub struct Process { pub stdin: Option, pub stdout: Option, pub stderr: Option, + + // In linux, when a FIFO is opened and there are no writers, the reader + // will continuously receive the HUP event. This can be problematic. + // To avoid this problem, we open stdin in write mode and keep the stdin-writer + pub stdin_w: Option, + // We need to open the stdout as the read mode and keep the open endpoint + // until the process is delete. otherwise, + // the process would exit before the containerd side open and read + // the stdout fifo, thus Kata would write all of the stdout contents into + // the stdout fifo and then closed the write endpoint. Then, containerd + // open the stdout fifo and try to read, since the write side had closed, + // thus containerd would block on the read forever. + // Here we keep the stdout/stderr read endpoint File in the process struct, + // which would be destroied when containerd send the delete rpc call, + // at this time the containerd had waited the stdout read return, thus it + // can make sure the contents in the stdout/stderr fifo wouldn't be lost. + pub stdout_r: Option, + // The purpose is the same as stdout_r + pub stderr_r: Option, + pub terminal: bool, pub height: u32, @@ -51,6 +71,24 @@ pub struct Process { pub passfd_io: Option, } +fn open_fifo(path: &str, is_read: bool, is_write: bool) -> Result { + let file = std::fs::OpenOptions::new() + .read(is_read) + .write(is_write) + .custom_flags(libc::O_NONBLOCK) + .open(path)?; + + Ok(file) +} + +fn open_fifo_read(path: &str) -> Result { + open_fifo(path, true, false) +} + +fn open_fifo_write(path: &str) -> Result { + open_fifo(path, false, true) +} + impl Process { pub fn new( process: &ContainerProcess, @@ -71,6 +109,9 @@ impl Process { stdin, stdout, stderr, + stdin_w: None, + stdout_r: None, + stderr_r: None, terminal, height: 0, width: 0, @@ -83,6 +124,25 @@ impl Process { } } + pub fn pre_fifos_open(&mut self) -> Result<()> { + if let Some(ref stdout) = self.stdout { + self.stdout_r = Some(open_fifo_read(stdout)?); + } + + if let Some(ref stderr) = self.stderr { + self.stderr_r = Some(open_fifo_read(stderr)?); + } + + Ok(()) + } + + pub fn post_fifos_open(&mut self) -> Result<()> { + if let Some(ref stdin) = self.stdin { + self.stdin_w = Some(open_fifo_write(stdin)?); + } + Ok(()) + } + /// Init the `passfd_io` struct and vsock connections for io to the agent. pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> { info!(self.logger, "passfd io init"); @@ -90,10 +150,12 @@ impl Process { let mut passfd_io = PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await; + self.pre_fifos_open()?; passfd_io .open_and_passfd(hvsock_uds_path, passfd_port, self.terminal) .await .context("passfd connect")?; + self.post_fifos_open()?; self.passfd_io = Some(passfd_io); @@ -176,10 +238,12 @@ impl Process { ) -> Result<()> { info!(self.logger, "start io and wait"); + self.pre_fifos_open()?; // new shim io let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr) .await .context("new shim io")?; + self.post_fifos_open()?; // start io copy for stdin let wgw_stdin = self.wg_stdin.worker(); @@ -337,6 +401,10 @@ impl Process { /// Close the stdin of the process in container. pub async fn close_io(&mut self, agent: Arc) { + // Close the stdin writer keeper so that + // the end signal could be received in the read side + self.stdin_w.take(); + // In passfd io mode, the stdin close and sync logic is handled // in the agent side. if self.passfd_io.is_none() {