diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 6b01c3b12e..2b3a7a4b70 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -65,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY use crate::sync_with_async::{read_async, write_async}; use async_trait::async_trait; use rlimit::{setrlimit, Resource, Rlim}; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncBufReadExt; use tokio::sync::Mutex; use kata_sys_util::hooks::HookStates; @@ -1002,38 +1002,12 @@ impl BaseContainer for LinuxContainer { // Copy from stdin to term_master if let Some(mut stdin_stream) = proc_io.stdin.take() { let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; - let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); - let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); let term_closer = term_closer.clone(); tokio::spawn(async move { - let mut buf = [0u8; 8192]; - loop { - tokio::select! { - // Make sure stdin_stream is drained before exiting - biased; - res = stdin_stream.read(&mut buf) => { - match res { - Err(_) | Ok(0) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); - break; - } - Ok(n) => { - if term_master.write_all(&buf[..n]).await.is_err() { - break; - } - } - } - } - // As the stdin fifo is opened in RW mode in the shim, which will never - // read EOF, we close the stdin fifo here when explicit requested. - _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); - break - } - } - } - wgw_input.done(); + let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; + debug!(logger, "copy from stdin to term_master end: {:?}", res); + std::mem::forget(term_master); // Avoid auto closing of term_master drop(term_closer); }); @@ -1070,37 +1044,10 @@ impl BaseContainer for LinuxContainer { if let Some(mut stdin_stream) = proc_io.stdin.take() { debug!(logger, "copy from stdin to parent_stdin"); let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; - let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); - let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); tokio::spawn(async move { - let mut buf = [0u8; 8192]; - loop { - tokio::select! { - // Make sure stdin_stream is drained before exiting - biased; - res = stdin_stream.read(&mut buf) => { - match res { - Err(_) | Ok(0) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); - break; - } - Ok(n) => { - if parent_stdin.write_all(&buf[..n]).await.is_err() { - break; - } - } - } - } - // As the stdin fifo is opened in RW mode in the shim, which will never - // read EOF, we close the stdin fifo here when explicit requested. - _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); - break - } - } - } - wgw_input.done(); + let res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin).await; + debug!(logger, "copy from stdin to term_master end: {:?}", res); }); } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index efc99afd39..853123cbcf 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -54,11 +54,6 @@ pub struct ProcessIo { pub stdin: Option, pub stdout: Option, pub stderr: Option, - // used to close stdin stream - pub close_stdin_tx: tokio::sync::watch::Sender, - pub close_stdin_rx: tokio::sync::watch::Receiver, - // wait for stdin copy task to finish - pub wg_input: WaitGroup, // used to wait for all process outputs to be copied to the vsock streams // only used when tty is used. pub wg_output: WaitGroup, @@ -70,15 +65,10 @@ impl ProcessIo { stdout: Option, stderr: Option, ) -> Self { - let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false); - ProcessIo { stdin, stdout, stderr, - close_stdin_tx, - close_stdin_rx, - wg_input: WaitGroup::new(), wg_output: WaitGroup::new(), } } @@ -210,11 +200,9 @@ impl Process { } pub async fn close_stdin(&mut self) { - if let Some(proc_io) = &mut self.proc_io { - // notify io copy task to close stdin stream - let _ = proc_io.close_stdin_tx.send(true); - // wait for io copy task to finish - proc_io.wg_input.wait().await; + // stdin will be closed automatically in passfd-io senario + if self.proc_io.is_some() { + return; } close_process_stream!(self, term_master, TermMaster); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index d4330c1ab8..aa598bc7ca 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -179,6 +179,14 @@ impl AgentService { &self, req: protocols::agent::CreateContainerRequest, ) -> Result<()> { + // create the proc_io first, in case there's some error occur below, thus we can make sure + // the io stream closed when error occur. + let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { + Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await) + } else { + None + }; + let cid = req.container_id.clone(); kata_sys_util::validate::verify_id(&cid)?; @@ -270,14 +278,6 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let p = if let Some(p) = oci.process { - let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { - Some( - passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port) - .await, - ) - } else { - None - }; Process::new(&sl(), &p, cid.as_str(), true, pipe_size, proc_io)? } else { info!(sl(), "no process configurations!"); @@ -376,6 +376,14 @@ impl AgentService { info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id); + // create the proc_io first, in case there's some error occur below, thus we can make sure + // the io stream closed when error occur. + let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { + Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await) + } else { + None + }; + let mut sandbox = self.sandbox.lock().await; let mut process = req .process @@ -388,13 +396,6 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let ocip = rustjail::process_grpc_to_oci(&process); - // passfd_listener_port != 0 indicates passfd io mode - let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { - Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await) - } else { - None - }; - let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size, proc_io)?; let ctr = sandbox diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index 0e27a36a49..438aa800d1 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -460,7 +460,8 @@ impl VsockEpollListener for VsockConnection { /// Notify the connection about an event (or set of events) that it was /// interested in. fn notify(&mut self, evset: epoll::Events) { - if evset.contains(epoll::Events::EPOLLIN) { + // EPOLLHUP also needs to be read and will be got len 0 + if evset.contains(epoll::Events::EPOLLIN) || evset.contains(epoll::Events::EPOLLHUP) { // Data can be read from the host stream. Setting a Rw pending // indication, so that the muxer will know to call `recv_pkt()` // later. @@ -514,6 +515,19 @@ impl VsockEpollListener for VsockConnection { self.pending_rx.insert(PendingRx::CreditUpdate); } } + + // The host stream has encountered an error. We'll kill this + // connection. + if evset.contains(epoll::Events::EPOLLERR) + && !evset.contains(epoll::Events::EPOLLIN) + && !evset.contains(epoll::Events::EPOLLOUT) + { + warn!( + "vsock: connection received EPOLLERR event: lp={}, pp={}", + self.local_port, self.peer_port + ); + self.kill(); + } } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 169c3b68d0..bd5e97e453 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -20,9 +20,9 @@ async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result { let mut stream = UnixStream::connect(&uds).await.context("connect")?; stream.write_all(b"passfd\n").await.context("write all")?; - // We want the io connection keep connected when the containerd closed the io pipe, - // thus it can be attached on the io stream. - let buf = format!("{} keep", port); + // Since we have already keep stdin_w/stdout_r/stderr_r, "keep" of passfd is no longer needed. + // Also, we can't keep connection here or the stdin would be stuck. + let buf = format!("{}", port); stream .send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()]) .context("send port and fd")?; @@ -78,16 +78,9 @@ impl PassfdIo { passfd_port: u32, terminal: bool, ) -> Result<()> { - // In linux, when a FIFO is opened and there are no writers, the reader - // will continuously receive the HUP event. This can be problematic - // when creating containers in detached mode, as the stdin FIFO writer - // is closed after the container is created, resulting in this situation. - // - // See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll if let Some(stdin) = &self.stdin { let fin = OpenOptions::new() .read(true) - .write(true) .custom_flags(libc::O_NONBLOCK) .open(stdin) .context("open stdin")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs index db3ce99891..f6fdfcb81f 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs @@ -6,21 +6,20 @@ use std::{ io, - os::unix::{ - io::{FromRawFd, RawFd}, - net::UnixStream as StdUnixStream, - prelude::AsRawFd, + os::{ + fd::IntoRawFd, + unix::{ + fs::OpenOptionsExt, + io::{FromRawFd, RawFd}, + net::UnixStream as StdUnixStream, + prelude::AsRawFd, + }, }, pin::Pin, - task::Context as TaskContext, - task::Poll, + task::{Context as TaskContext, Poll}, }; use anyhow::{anyhow, Context, Result}; -use nix::{ - fcntl::{self, OFlag}, - sys::stat::Mode, -}; use tokio::{ fs::OpenOptions, io::{AsyncRead, AsyncWrite}, @@ -28,13 +27,16 @@ use tokio::{ }; use url::Url; -fn open_fifo(path: &str) -> Result { - let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?; - +fn open_fifo_write(path: &str) -> Result { + let std_file = std::fs::OpenOptions::new() + .write(true) + // It's not for non-block openning FIFO but for non-block stream which + // will be add into tokio runtime. + .custom_flags(libc::O_NONBLOCK) + .open(path) + .with_context(|| format!("open {} with write", path))?; + let fd = std_file.into_raw_fd(); let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) }; - std_stream - .set_nonblocking(true) - .context("set nonblocking")?; AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e)) } @@ -67,13 +69,10 @@ impl ShimIo { let stdin_fd: Option> = if let Some(stdin) = stdin { info!(sl!(), "open stdin {:?}", &stdin); - // Since the stdin peer point (which is hold by containerd) could not be openned - // immediately, which would block here's open with block mode, and we wouldn't want to - // block here, thus here opened with nonblock and then reset it to block mode for - // tokio async io. + // Since we had opened the stdin as write mode in the Process::new function, + // thus it wouldn't be blocked to open it as read mode. match OpenOptions::new() .read(true) - .write(false) .custom_flags(libc::O_NONBLOCK) .open(&stdin) .await @@ -118,7 +117,7 @@ impl ShimIo { if let Some(url) = url { if url.scheme() == "fifo" { let path = url.path(); - match open_fifo(path) { + match open_fifo_write(path) { Ok(s) => { return Some(Box::new(ShimIoWrite::Stream(s))); } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 6cc7557702..34e9a9a60b 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -5,7 +5,7 @@ // use std::collections::HashMap; -use std::sync::Arc; +use std::{fs::File, os::unix::fs::OpenOptionsExt, sync::Arc}; use agent::Agent; use anyhow::{Context, Result}; @@ -33,6 +33,26 @@ pub struct Process { pub stdin: Option, pub stdout: Option, pub stderr: Option, + + // In linux, when a FIFO is opened and there are no writers, the reader + // will continuously receive the HUP event. This can be problematic. + // To avoid this problem, we open stdin in write mode and keep the stdin-writer + pub stdin_w: Option, + // We need to open the stdout as the read mode and keep the open endpoint + // until the process is delete. otherwise, + // the process would exit before the containerd side open and read + // the stdout fifo, thus Kata would write all of the stdout contents into + // the stdout fifo and then closed the write endpoint. Then, containerd + // open the stdout fifo and try to read, since the write side had closed, + // thus containerd would block on the read forever. + // Here we keep the stdout/stderr read endpoint File in the process struct, + // which would be destroied when containerd send the delete rpc call, + // at this time the containerd had waited the stdout read return, thus it + // can make sure the contents in the stdout/stderr fifo wouldn't be lost. + pub stdout_r: Option, + // The purpose is the same as stdout_r + pub stderr_r: Option, + pub terminal: bool, pub height: u32, @@ -51,6 +71,24 @@ pub struct Process { pub passfd_io: Option, } +fn open_fifo(path: &str, is_read: bool, is_write: bool) -> Result { + let file = std::fs::OpenOptions::new() + .read(is_read) + .write(is_write) + .custom_flags(libc::O_NONBLOCK) + .open(path)?; + + Ok(file) +} + +fn open_fifo_read(path: &str) -> Result { + open_fifo(path, true, false) +} + +fn open_fifo_write(path: &str) -> Result { + open_fifo(path, false, true) +} + impl Process { pub fn new( process: &ContainerProcess, @@ -71,6 +109,9 @@ impl Process { stdin, stdout, stderr, + stdin_w: None, + stdout_r: None, + stderr_r: None, terminal, height: 0, width: 0, @@ -83,6 +124,25 @@ impl Process { } } + pub fn pre_fifos_open(&mut self) -> Result<()> { + if let Some(ref stdout) = self.stdout { + self.stdout_r = Some(open_fifo_read(stdout)?); + } + + if let Some(ref stderr) = self.stderr { + self.stderr_r = Some(open_fifo_read(stderr)?); + } + + Ok(()) + } + + pub fn post_fifos_open(&mut self) -> Result<()> { + if let Some(ref stdin) = self.stdin { + self.stdin_w = Some(open_fifo_write(stdin)?); + } + Ok(()) + } + /// Init the `passfd_io` struct and vsock connections for io to the agent. pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> { info!(self.logger, "passfd io init"); @@ -90,10 +150,12 @@ impl Process { let mut passfd_io = PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await; + self.pre_fifos_open()?; passfd_io .open_and_passfd(hvsock_uds_path, passfd_port, self.terminal) .await .context("passfd connect")?; + self.post_fifos_open()?; self.passfd_io = Some(passfd_io); @@ -176,10 +238,12 @@ impl Process { ) -> Result<()> { info!(self.logger, "start io and wait"); + self.pre_fifos_open()?; // new shim io let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr) .await .context("new shim io")?; + self.post_fifos_open()?; // start io copy for stdin let wgw_stdin = self.wg_stdin.worker(); @@ -337,6 +401,10 @@ impl Process { /// Close the stdin of the process in container. pub async fn close_io(&mut self, agent: Arc) { + // Close the stdin writer keeper so that + // the end signal could be received in the read side + self.stdin_w.take(); + // In passfd io mode, the stdin close and sync logic is handled // in the agent side. if self.passfd_io.is_none() {