Merge pull request #9335 from Tim-Zhang/fix-passfd-fifo-open

passfd-io: fix FIFO opening and vsock handling
This commit is contained in:
Fupan Li
2024-04-23 09:04:45 +08:00
committed by GitHub
7 changed files with 133 additions and 123 deletions

View File

@@ -20,9 +20,9 @@ async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result<u32> {
let mut stream = UnixStream::connect(&uds).await.context("connect")?;
stream.write_all(b"passfd\n").await.context("write all")?;
// We want the io connection keep connected when the containerd closed the io pipe,
// thus it can be attached on the io stream.
let buf = format!("{} keep", port);
// Since we have already keep stdin_w/stdout_r/stderr_r, "keep" of passfd is no longer needed.
// Also, we can't keep connection here or the stdin would be stuck.
let buf = format!("{}", port);
stream
.send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()])
.context("send port and fd")?;
@@ -78,16 +78,9 @@ impl PassfdIo {
passfd_port: u32,
terminal: bool,
) -> Result<()> {
// In linux, when a FIFO is opened and there are no writers, the reader
// will continuously receive the HUP event. This can be problematic
// when creating containers in detached mode, as the stdin FIFO writer
// is closed after the container is created, resulting in this situation.
//
// See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll
if let Some(stdin) = &self.stdin {
let fin = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(stdin)
.context("open stdin")?;

View File

@@ -6,21 +6,20 @@
use std::{
io,
os::unix::{
io::{FromRawFd, RawFd},
net::UnixStream as StdUnixStream,
prelude::AsRawFd,
os::{
fd::IntoRawFd,
unix::{
fs::OpenOptionsExt,
io::{FromRawFd, RawFd},
net::UnixStream as StdUnixStream,
prelude::AsRawFd,
},
},
pin::Pin,
task::Context as TaskContext,
task::Poll,
task::{Context as TaskContext, Poll},
};
use anyhow::{anyhow, Context, Result};
use nix::{
fcntl::{self, OFlag},
sys::stat::Mode,
};
use tokio::{
fs::OpenOptions,
io::{AsyncRead, AsyncWrite},
@@ -28,13 +27,16 @@ use tokio::{
};
use url::Url;
fn open_fifo(path: &str) -> Result<AsyncUnixStream> {
let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?;
fn open_fifo_write(path: &str) -> Result<AsyncUnixStream> {
let std_file = std::fs::OpenOptions::new()
.write(true)
// It's not for non-block openning FIFO but for non-block stream which
// will be add into tokio runtime.
.custom_flags(libc::O_NONBLOCK)
.open(path)
.with_context(|| format!("open {} with write", path))?;
let fd = std_file.into_raw_fd();
let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) };
std_stream
.set_nonblocking(true)
.context("set nonblocking")?;
AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e))
}
@@ -67,13 +69,10 @@ impl ShimIo {
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
info!(sl!(), "open stdin {:?}", &stdin);
// Since the stdin peer point (which is hold by containerd) could not be openned
// immediately, which would block here's open with block mode, and we wouldn't want to
// block here, thus here opened with nonblock and then reset it to block mode for
// tokio async io.
// Since we had opened the stdin as write mode in the Process::new function,
// thus it wouldn't be blocked to open it as read mode.
match OpenOptions::new()
.read(true)
.write(false)
.custom_flags(libc::O_NONBLOCK)
.open(&stdin)
.await
@@ -118,7 +117,7 @@ impl ShimIo {
if let Some(url) = url {
if url.scheme() == "fifo" {
let path = url.path();
match open_fifo(path) {
match open_fifo_write(path) {
Ok(s) => {
return Some(Box::new(ShimIoWrite::Stream(s)));
}

View File

@@ -5,7 +5,7 @@
//
use std::collections::HashMap;
use std::sync::Arc;
use std::{fs::File, os::unix::fs::OpenOptionsExt, sync::Arc};
use agent::Agent;
use anyhow::{Context, Result};
@@ -33,6 +33,26 @@ pub struct Process {
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
// In linux, when a FIFO is opened and there are no writers, the reader
// will continuously receive the HUP event. This can be problematic.
// To avoid this problem, we open stdin in write mode and keep the stdin-writer
pub stdin_w: Option<File>,
// We need to open the stdout as the read mode and keep the open endpoint
// until the process is delete. otherwise,
// the process would exit before the containerd side open and read
// the stdout fifo, thus Kata would write all of the stdout contents into
// the stdout fifo and then closed the write endpoint. Then, containerd
// open the stdout fifo and try to read, since the write side had closed,
// thus containerd would block on the read forever.
// Here we keep the stdout/stderr read endpoint File in the process struct,
// which would be destroied when containerd send the delete rpc call,
// at this time the containerd had waited the stdout read return, thus it
// can make sure the contents in the stdout/stderr fifo wouldn't be lost.
pub stdout_r: Option<File>,
// The purpose is the same as stdout_r
pub stderr_r: Option<File>,
pub terminal: bool,
pub height: u32,
@@ -51,6 +71,24 @@ pub struct Process {
pub passfd_io: Option<PassfdIo>,
}
fn open_fifo(path: &str, is_read: bool, is_write: bool) -> Result<File> {
let file = std::fs::OpenOptions::new()
.read(is_read)
.write(is_write)
.custom_flags(libc::O_NONBLOCK)
.open(path)?;
Ok(file)
}
fn open_fifo_read(path: &str) -> Result<File> {
open_fifo(path, true, false)
}
fn open_fifo_write(path: &str) -> Result<File> {
open_fifo(path, false, true)
}
impl Process {
pub fn new(
process: &ContainerProcess,
@@ -71,6 +109,9 @@ impl Process {
stdin,
stdout,
stderr,
stdin_w: None,
stdout_r: None,
stderr_r: None,
terminal,
height: 0,
width: 0,
@@ -83,6 +124,25 @@ impl Process {
}
}
pub fn pre_fifos_open(&mut self) -> Result<()> {
if let Some(ref stdout) = self.stdout {
self.stdout_r = Some(open_fifo_read(stdout)?);
}
if let Some(ref stderr) = self.stderr {
self.stderr_r = Some(open_fifo_read(stderr)?);
}
Ok(())
}
pub fn post_fifos_open(&mut self) -> Result<()> {
if let Some(ref stdin) = self.stdin {
self.stdin_w = Some(open_fifo_write(stdin)?);
}
Ok(())
}
/// Init the `passfd_io` struct and vsock connections for io to the agent.
pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> {
info!(self.logger, "passfd io init");
@@ -90,10 +150,12 @@ impl Process {
let mut passfd_io =
PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await;
self.pre_fifos_open()?;
passfd_io
.open_and_passfd(hvsock_uds_path, passfd_port, self.terminal)
.await
.context("passfd connect")?;
self.post_fifos_open()?;
self.passfd_io = Some(passfd_io);
@@ -176,10 +238,12 @@ impl Process {
) -> Result<()> {
info!(self.logger, "start io and wait");
self.pre_fifos_open()?;
// new shim io
let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr)
.await
.context("new shim io")?;
self.post_fifos_open()?;
// start io copy for stdin
let wgw_stdin = self.wg_stdin.worker();
@@ -337,6 +401,10 @@ impl Process {
/// Close the stdin of the process in container.
pub async fn close_io(&mut self, agent: Arc<dyn Agent>) {
// Close the stdin writer keeper so that
// the end signal could be received in the read side
self.stdin_w.take();
// In passfd io mode, the stdin close and sync logic is handled
// in the agent side.
if self.passfd_io.is_none() {