runtime-rs: fix FIFO handling

Fixes: #9334

In linux, when a FIFO is opened and there are no writers, the reader
will continuously receive the HUP event. This can be problematic.
To avoid this problem, we open stdin in write mode and keep the stdin-writer

We need to open the stdout/stderr as the read mode and keep the open endpoint
until the process is delete. otherwise,
the process would exit before the containerd side open and read
the stdout fifo, thus runD would write all of the stdout contents into
the stdout fifo and then closed the write endpoint. Then, containerd
open the stdout fifo and try to read, since the write side had closed,
thus containerd would block on the read forever.
Here we keep the stdout/stderr read endpoint File in the common_process,
which would be destroied when containerd send the delete rpc call,
at this time the containerd had waited the stdout read return, thus it
can make sure the contents in the stdout/stderr fifo wouldn't be lost.

Signed-off-by: Tim Zhang <tim@hyper.sh>
Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
Tim Zhang
2024-04-16 20:48:31 +08:00
parent d68eb7f0ad
commit 8441187d5e
3 changed files with 93 additions and 33 deletions

View File

@@ -20,9 +20,9 @@ async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result<u32> {
let mut stream = UnixStream::connect(&uds).await.context("connect")?;
stream.write_all(b"passfd\n").await.context("write all")?;
// We want the io connection keep connected when the containerd closed the io pipe,
// thus it can be attached on the io stream.
let buf = format!("{} keep", port);
// Since we have already keep stdin_w/stdout_r/stderr_r, "keep" of passfd is no longer needed.
// Also, we can't keep connection here or the stdin would be stuck.
let buf = format!("{}", port);
stream
.send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()])
.context("send port and fd")?;
@@ -78,16 +78,9 @@ impl PassfdIo {
passfd_port: u32,
terminal: bool,
) -> Result<()> {
// In linux, when a FIFO is opened and there are no writers, the reader
// will continuously receive the HUP event. This can be problematic
// when creating containers in detached mode, as the stdin FIFO writer
// is closed after the container is created, resulting in this situation.
//
// See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll
if let Some(stdin) = &self.stdin {
let fin = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(stdin)
.context("open stdin")?;

View File

@@ -6,21 +6,20 @@
use std::{
io,
os::unix::{
io::{FromRawFd, RawFd},
net::UnixStream as StdUnixStream,
prelude::AsRawFd,
os::{
fd::IntoRawFd,
unix::{
fs::OpenOptionsExt,
io::{FromRawFd, RawFd},
net::UnixStream as StdUnixStream,
prelude::AsRawFd,
},
},
pin::Pin,
task::Context as TaskContext,
task::Poll,
task::{Context as TaskContext, Poll},
};
use anyhow::{anyhow, Context, Result};
use nix::{
fcntl::{self, OFlag},
sys::stat::Mode,
};
use tokio::{
fs::OpenOptions,
io::{AsyncRead, AsyncWrite},
@@ -28,13 +27,16 @@ use tokio::{
};
use url::Url;
fn open_fifo(path: &str) -> Result<AsyncUnixStream> {
let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?;
fn open_fifo_write(path: &str) -> Result<AsyncUnixStream> {
let std_file = std::fs::OpenOptions::new()
.write(true)
// It's not for non-block openning FIFO but for non-block stream which
// will be add into tokio runtime.
.custom_flags(libc::O_NONBLOCK)
.open(path)
.with_context(|| format!("open {} with write", path))?;
let fd = std_file.into_raw_fd();
let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) };
std_stream
.set_nonblocking(true)
.context("set nonblocking")?;
AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e))
}
@@ -67,13 +69,10 @@ impl ShimIo {
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
info!(sl!(), "open stdin {:?}", &stdin);
// Since the stdin peer point (which is hold by containerd) could not be openned
// immediately, which would block here's open with block mode, and we wouldn't want to
// block here, thus here opened with nonblock and then reset it to block mode for
// tokio async io.
// Since we had opened the stdin as write mode in the Process::new function,
// thus it wouldn't be blocked to open it as read mode.
match OpenOptions::new()
.read(true)
.write(false)
.custom_flags(libc::O_NONBLOCK)
.open(&stdin)
.await
@@ -118,7 +117,7 @@ impl ShimIo {
if let Some(url) = url {
if url.scheme() == "fifo" {
let path = url.path();
match open_fifo(path) {
match open_fifo_write(path) {
Ok(s) => {
return Some(Box::new(ShimIoWrite::Stream(s)));
}

View File

@@ -5,7 +5,7 @@
//
use std::collections::HashMap;
use std::sync::Arc;
use std::{fs::File, os::unix::fs::OpenOptionsExt, sync::Arc};
use agent::Agent;
use anyhow::{Context, Result};
@@ -33,6 +33,26 @@ pub struct Process {
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
// In linux, when a FIFO is opened and there are no writers, the reader
// will continuously receive the HUP event. This can be problematic.
// To avoid this problem, we open stdin in write mode and keep the stdin-writer
pub stdin_w: Option<File>,
// We need to open the stdout as the read mode and keep the open endpoint
// until the process is delete. otherwise,
// the process would exit before the containerd side open and read
// the stdout fifo, thus Kata would write all of the stdout contents into
// the stdout fifo and then closed the write endpoint. Then, containerd
// open the stdout fifo and try to read, since the write side had closed,
// thus containerd would block on the read forever.
// Here we keep the stdout/stderr read endpoint File in the process struct,
// which would be destroied when containerd send the delete rpc call,
// at this time the containerd had waited the stdout read return, thus it
// can make sure the contents in the stdout/stderr fifo wouldn't be lost.
pub stdout_r: Option<File>,
// The purpose is the same as stdout_r
pub stderr_r: Option<File>,
pub terminal: bool,
pub height: u32,
@@ -51,6 +71,24 @@ pub struct Process {
pub passfd_io: Option<PassfdIo>,
}
fn open_fifo(path: &str, is_read: bool, is_write: bool) -> Result<File> {
let file = std::fs::OpenOptions::new()
.read(is_read)
.write(is_write)
.custom_flags(libc::O_NONBLOCK)
.open(path)?;
Ok(file)
}
fn open_fifo_read(path: &str) -> Result<File> {
open_fifo(path, true, false)
}
fn open_fifo_write(path: &str) -> Result<File> {
open_fifo(path, false, true)
}
impl Process {
pub fn new(
process: &ContainerProcess,
@@ -71,6 +109,9 @@ impl Process {
stdin,
stdout,
stderr,
stdin_w: None,
stdout_r: None,
stderr_r: None,
terminal,
height: 0,
width: 0,
@@ -83,6 +124,25 @@ impl Process {
}
}
pub fn pre_fifos_open(&mut self) -> Result<()> {
if let Some(ref stdout) = self.stdout {
self.stdout_r = Some(open_fifo_read(stdout)?);
}
if let Some(ref stderr) = self.stderr {
self.stderr_r = Some(open_fifo_read(stderr)?);
}
Ok(())
}
pub fn post_fifos_open(&mut self) -> Result<()> {
if let Some(ref stdin) = self.stdin {
self.stdin_w = Some(open_fifo_write(stdin)?);
}
Ok(())
}
/// Init the `passfd_io` struct and vsock connections for io to the agent.
pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> {
info!(self.logger, "passfd io init");
@@ -90,10 +150,12 @@ impl Process {
let mut passfd_io =
PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await;
self.pre_fifos_open()?;
passfd_io
.open_and_passfd(hvsock_uds_path, passfd_port, self.terminal)
.await
.context("passfd connect")?;
self.post_fifos_open()?;
self.passfd_io = Some(passfd_io);
@@ -176,10 +238,12 @@ impl Process {
) -> Result<()> {
info!(self.logger, "start io and wait");
self.pre_fifos_open()?;
// new shim io
let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr)
.await
.context("new shim io")?;
self.post_fifos_open()?;
// start io copy for stdin
let wgw_stdin = self.wg_stdin.worker();
@@ -337,6 +401,10 @@ impl Process {
/// Close the stdin of the process in container.
pub async fn close_io(&mut self, agent: Arc<dyn Agent>) {
// Close the stdin writer keeper so that
// the end signal could be received in the read side
self.stdin_w.take();
// In passfd io mode, the stdin close and sync logic is handled
// in the agent side.
if self.passfd_io.is_none() {