mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 15:02:45 +00:00
agent: exit from exec hangs if background process is present
This is the Rust porting of https://github.com/kata-containers/agent/pull/371 `read_stdout`/`read_stderr` is blocking rpc calls, if exec process exited, these calls is on blocking state for reading on process's term master fd, and can't get a chance to break the wait. In this PR, `read_stdout`/`read_stderr` will not read directly from a term master of a process, instead, it will first have to get an fd to read from newly added `epoller.poll()`. `epoller.poll()` may returns: - the term master fd of exec process, if the process is running. - a fd(piped fd) will return EOF when reading to indicate that th process is exited. Fixes: #1160 Signed-off-by: bin liu <bin@hyper.sh>
This commit is contained in:
parent
1dd77e204f
commit
1ca415d87e
11
src/agent/Cargo.lock
generated
11
src/agent/Cargo.lock
generated
@ -248,6 +248,16 @@ version = "1.6.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "epoll"
|
||||||
|
version = "4.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "20df693c700404f7e19d4d6fae6b15215d2913c27955d2b9d6f2c0f537511cd0"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "errno"
|
name = "errno"
|
||||||
version = "0.2.6"
|
version = "0.2.6"
|
||||||
@ -921,6 +931,7 @@ dependencies = [
|
|||||||
"caps",
|
"caps",
|
||||||
"cgroups",
|
"cgroups",
|
||||||
"dirs",
|
"dirs",
|
||||||
|
"epoll",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
"nix 0.17.0",
|
"nix 0.17.0",
|
||||||
|
@ -26,6 +26,7 @@ dirs = "3.0.1"
|
|||||||
anyhow = "1.0.32"
|
anyhow = "1.0.32"
|
||||||
cgroups = { git = "https://github.com/kata-containers/cgroups-rs", branch = "stable-0.1.1"}
|
cgroups = { git = "https://github.com/kata-containers/cgroups-rs", branch = "stable-0.1.1"}
|
||||||
tempfile = "3.1.0"
|
tempfile = "3.1.0"
|
||||||
|
epoll = "4.3.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = "0.5.0"
|
serial_test = "0.5.0"
|
||||||
|
@ -41,6 +41,7 @@ pub mod cgroups;
|
|||||||
pub mod container;
|
pub mod container;
|
||||||
pub mod mount;
|
pub mod mount;
|
||||||
pub mod process;
|
pub mod process;
|
||||||
|
pub mod reaper;
|
||||||
pub mod specconv;
|
pub mod specconv;
|
||||||
pub mod sync;
|
pub mod sync;
|
||||||
pub mod validator;
|
pub mod validator;
|
||||||
|
@ -14,6 +14,7 @@ use nix::sys::wait::{self, WaitStatus};
|
|||||||
use nix::unistd::{self, Pid};
|
use nix::unistd::{self, Pid};
|
||||||
use nix::Result;
|
use nix::Result;
|
||||||
|
|
||||||
|
use crate::reaper::Epoller;
|
||||||
use oci::Process as OCIProcess;
|
use oci::Process as OCIProcess;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
|
|
||||||
@ -40,6 +41,7 @@ pub struct Process {
|
|||||||
pub exit_watchers: Vec<Sender<i32>>,
|
pub exit_watchers: Vec<Sender<i32>>,
|
||||||
pub oci: OCIProcess,
|
pub oci: OCIProcess,
|
||||||
pub logger: Logger,
|
pub logger: Logger,
|
||||||
|
pub epoller: Option<Epoller>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ProcessOperations {
|
pub trait ProcessOperations {
|
||||||
@ -91,6 +93,7 @@ impl Process {
|
|||||||
exit_watchers: Vec::new(),
|
exit_watchers: Vec::new(),
|
||||||
oci: ocip.clone(),
|
oci: ocip.clone(),
|
||||||
logger: logger.clone(),
|
logger: logger.clone(),
|
||||||
|
epoller: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
info!(logger, "before create console socket!");
|
info!(logger, "before create console socket!");
|
||||||
@ -112,6 +115,29 @@ impl Process {
|
|||||||
}
|
}
|
||||||
Ok(p)
|
Ok(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn close_epoller(&mut self) {
|
||||||
|
if let Some(epoller) = self.epoller.take() {
|
||||||
|
epoller.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_epoller(&mut self) -> anyhow::Result<()> {
|
||||||
|
match self.term_master {
|
||||||
|
Some(term_master) => {
|
||||||
|
// add epoller to process
|
||||||
|
let epoller = Epoller::new(&self.logger, term_master)?;
|
||||||
|
self.epoller = Some(epoller)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
info!(
|
||||||
|
self.logger,
|
||||||
|
"try to add epoller to a process without a term master fd"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_extended_pipe(flags: OFlag, pipe_size: i32) -> Result<(RawFd, RawFd)> {
|
fn create_extended_pipe(flags: OFlag, pipe_size: i32) -> Result<(RawFd, RawFd)> {
|
||||||
|
150
src/agent/rustjail/src/reaper.rs
Normal file
150
src/agent/rustjail/src/reaper.rs
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
// Copyright (c) 2020 Ant Group
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
//
|
||||||
|
|
||||||
|
use nix::fcntl::OFlag;
|
||||||
|
use slog::Logger;
|
||||||
|
|
||||||
|
use nix::unistd;
|
||||||
|
use std::os::unix::io::RawFd;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
const MAX_EVENTS: usize = 2;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Epoller {
|
||||||
|
logger: Logger,
|
||||||
|
epoll_fd: RawFd,
|
||||||
|
// rfd and wfd are a pipe's files two ends, this pipe is
|
||||||
|
// used to sync between the readStdio and the process exits.
|
||||||
|
// once the process exits, it will close one end to notify
|
||||||
|
// the readStdio that the process has exited and it should not
|
||||||
|
// wait on the process's terminal which has been inherited
|
||||||
|
// by it's children and hasn't exited.
|
||||||
|
rfd: RawFd,
|
||||||
|
wfd: RawFd,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Epoller {
|
||||||
|
pub fn new(logger: &Logger, fd: RawFd) -> Result<Epoller> {
|
||||||
|
let epoll_fd = epoll::create(true)?;
|
||||||
|
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
|
||||||
|
|
||||||
|
let mut epoller = Self {
|
||||||
|
logger: logger.clone(),
|
||||||
|
epoll_fd,
|
||||||
|
rfd,
|
||||||
|
wfd,
|
||||||
|
};
|
||||||
|
|
||||||
|
epoller.add(rfd)?;
|
||||||
|
epoller.add(fd)?;
|
||||||
|
|
||||||
|
Ok(epoller)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close_wfd(&self) {
|
||||||
|
let _ = unistd::close(self.wfd);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(&self) {
|
||||||
|
let _ = unistd::close(self.rfd);
|
||||||
|
let _ = unistd::close(self.wfd);
|
||||||
|
let _ = unistd::close(self.epoll_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add(&mut self, fd: RawFd) -> Result<()> {
|
||||||
|
info!(self.logger, "Epoller add fd {}", fd);
|
||||||
|
// add creates an epoll which is used to monitor the process's pty's master and
|
||||||
|
// one end of its exit notify pipe. Those files will be registered with level-triggered
|
||||||
|
// notification.
|
||||||
|
epoll::ctl(
|
||||||
|
self.epoll_fd,
|
||||||
|
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||||
|
fd,
|
||||||
|
epoll::Event::new(
|
||||||
|
epoll::Events::EPOLLHUP
|
||||||
|
| epoll::Events::EPOLLIN
|
||||||
|
| epoll::Events::EPOLLERR
|
||||||
|
| epoll::Events::EPOLLRDHUP,
|
||||||
|
fd as u64,
|
||||||
|
),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// There will be three cases on the epoller once it poll:
|
||||||
|
// a: only pty's master get an event(other than self.rfd);
|
||||||
|
// b: only the pipe get an event(self.rfd);
|
||||||
|
// c: both of pty and pipe have event occur;
|
||||||
|
// for case a, it means there is output in process's terminal and what needed to do is
|
||||||
|
// just read the terminal and send them out; for case b, it means the process has exited
|
||||||
|
// and there is no data in the terminal, thus just return the "EOF" to end the io;
|
||||||
|
// for case c, it means the process has exited but there is some data in the terminal which
|
||||||
|
// hasn't been send out, thus it should send those data out first and then send "EOF" last to
|
||||||
|
// end the io.
|
||||||
|
pub fn poll(&self) -> Result<RawFd> {
|
||||||
|
let mut rfd = self.rfd;
|
||||||
|
let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); MAX_EVENTS];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let event_count = match epoll::wait(self.epoll_fd, -1, epoll_events.as_mut_slice()) {
|
||||||
|
Ok(ec) => ec,
|
||||||
|
Err(e) => {
|
||||||
|
info!(self.logger, "loop wait err {:?}", e);
|
||||||
|
// EINTR: The call was interrupted by a signal handler before either
|
||||||
|
// any of the requested events occurred or the timeout expired
|
||||||
|
if e.kind() == std::io::ErrorKind::Interrupted {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for event in epoll_events.iter().take(event_count) {
|
||||||
|
let fd = event.data as i32;
|
||||||
|
// fd has been assigned with one end of process's exited pipe by default, and
|
||||||
|
// here to check is there any event occur on process's terminal, if "yes", it
|
||||||
|
// should be dealt first, otherwise, it means the process has exited and there
|
||||||
|
// is nothing left in the process's terminal needed to be read.
|
||||||
|
if fd != rfd {
|
||||||
|
rfd = fd;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(rfd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::Epoller;
|
||||||
|
use nix::fcntl::OFlag;
|
||||||
|
use nix::unistd;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_epoller_poll() {
|
||||||
|
let logger = slog::Logger::root(slog::Discard, o!());
|
||||||
|
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap();
|
||||||
|
let epoller = Epoller::new(&logger, rfd).unwrap();
|
||||||
|
|
||||||
|
let child = thread::spawn(move || {
|
||||||
|
let _ = unistd::write(wfd, "temporary file's content".as_bytes());
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait write to finish
|
||||||
|
let _ = child.join();
|
||||||
|
|
||||||
|
let fd = epoller.poll().unwrap();
|
||||||
|
assert_eq!(fd, rfd, "Should get rfd");
|
||||||
|
|
||||||
|
epoller.close();
|
||||||
|
}
|
||||||
|
}
|
@ -306,6 +306,7 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
|
|||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status));
|
||||||
|
|
||||||
let pid = wait_status.pid();
|
let pid = wait_status.pid();
|
||||||
if let Some(pid) = pid {
|
if let Some(pid) = pid {
|
||||||
@ -342,6 +343,13 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
|
|||||||
|
|
||||||
p.exit_code = ret;
|
p.exit_code = ret;
|
||||||
let _ = unistd::close(pipe_write);
|
let _ = unistd::close(pipe_write);
|
||||||
|
|
||||||
|
if let Some(ref poller) = p.epoller {
|
||||||
|
info!(logger, "close epoller");
|
||||||
|
// close the socket file to notify readStdio to close terminal specifically
|
||||||
|
// in case this process's terminal has been inherited by its children.
|
||||||
|
poller.close_wfd()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ use protocols::types::Interface;
|
|||||||
use rustjail::cgroups::notifier;
|
use rustjail::cgroups::notifier;
|
||||||
use rustjail::container::{BaseContainer, Container, LinuxContainer};
|
use rustjail::container::{BaseContainer, Container, LinuxContainer};
|
||||||
use rustjail::process::Process;
|
use rustjail::process::Process;
|
||||||
|
use rustjail::reaper;
|
||||||
use rustjail::specconv::CreateOpts;
|
use rustjail::specconv::CreateOpts;
|
||||||
|
|
||||||
use nix::errno::Errno;
|
use nix::errno::Errno;
|
||||||
@ -189,10 +190,14 @@ impl agentService {
|
|||||||
let cg_path = ctr.cgroup_manager.as_ref().unwrap().get_cg_path("memory");
|
let cg_path = ctr.cgroup_manager.as_ref().unwrap().get_cg_path("memory");
|
||||||
if cg_path.is_some() {
|
if cg_path.is_some() {
|
||||||
let rx = notifier::notify_oom(cid.as_str(), cg_path.unwrap())?;
|
let rx = notifier::notify_oom(cid.as_str(), cg_path.unwrap())?;
|
||||||
s.run_oom_event_monitor(rx, cid);
|
s.run_oom_event_monitor(rx, cid.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set epoller
|
||||||
|
let p = find_process(&mut s, cid.as_str(), "", true)?;
|
||||||
|
p.create_epoller()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -272,7 +277,7 @@ impl agentService {
|
|||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let exec_id = req.exec_id.clone();
|
let exec_id = req.exec_id.clone();
|
||||||
|
|
||||||
info!(sl!(), "cid: {} eid: {}", cid, exec_id);
|
info!(sl!(), "do_exec_process cid: {} eid: {}", cid, exec_id);
|
||||||
|
|
||||||
let s = self.sandbox.clone();
|
let s = self.sandbox.clone();
|
||||||
let mut sandbox = s.lock().unwrap();
|
let mut sandbox = s.lock().unwrap();
|
||||||
@ -293,6 +298,10 @@ impl agentService {
|
|||||||
|
|
||||||
ctr.run(p)?;
|
ctr.run(p)?;
|
||||||
|
|
||||||
|
// set epoller
|
||||||
|
let p = find_process(&mut sandbox, cid.as_str(), exec_id.as_str(), false)?;
|
||||||
|
p.create_epoller()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,6 +412,8 @@ impl agentService {
|
|||||||
let _ = unistd::close(p.exit_pipe_r.unwrap());
|
let _ = unistd::close(p.exit_pipe_r.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.close_epoller();
|
||||||
|
|
||||||
p.parent_stdin = None;
|
p.parent_stdin = None;
|
||||||
p.parent_stdout = None;
|
p.parent_stdout = None;
|
||||||
p.parent_stderr = None;
|
p.parent_stderr = None;
|
||||||
@ -427,13 +438,6 @@ impl agentService {
|
|||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let eid = req.exec_id.clone();
|
let eid = req.exec_id.clone();
|
||||||
|
|
||||||
info!(
|
|
||||||
sl!(),
|
|
||||||
"write stdin";
|
|
||||||
"container-id" => cid.clone(),
|
|
||||||
"exec-id" => eid.clone()
|
|
||||||
);
|
|
||||||
|
|
||||||
let s = self.sandbox.clone();
|
let s = self.sandbox.clone();
|
||||||
let mut sandbox = s.lock().unwrap();
|
let mut sandbox = s.lock().unwrap();
|
||||||
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
|
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
|
||||||
@ -477,6 +481,7 @@ impl agentService {
|
|||||||
let eid = req.exec_id;
|
let eid = req.exec_id;
|
||||||
|
|
||||||
let mut fd: RawFd = -1;
|
let mut fd: RawFd = -1;
|
||||||
|
let mut epoller: Option<reaper::Epoller> = None;
|
||||||
{
|
{
|
||||||
let s = self.sandbox.clone();
|
let s = self.sandbox.clone();
|
||||||
let mut sandbox = s.lock().unwrap();
|
let mut sandbox = s.lock().unwrap();
|
||||||
@ -485,6 +490,7 @@ impl agentService {
|
|||||||
|
|
||||||
if p.term_master.is_some() {
|
if p.term_master.is_some() {
|
||||||
fd = p.term_master.unwrap();
|
fd = p.term_master.unwrap();
|
||||||
|
epoller = p.epoller.clone();
|
||||||
} else if stdout {
|
} else if stdout {
|
||||||
if p.parent_stdout.is_some() {
|
if p.parent_stdout.is_some() {
|
||||||
fd = p.parent_stdout.unwrap();
|
fd = p.parent_stdout.unwrap();
|
||||||
@ -494,6 +500,17 @@ impl agentService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(epoller) = epoller {
|
||||||
|
// The process's epoller's poll() will return a file descriptor of the process's
|
||||||
|
// terminal or one end of its exited pipe. If it returns its terminal, it means
|
||||||
|
// there is data needed to be read out or it has been closed; if it returns the
|
||||||
|
// process's exited pipe, it means the process has exited and there is no data
|
||||||
|
// needed to be read out in its terminal, thus following read on it will read out
|
||||||
|
// "EOF" to terminate this process's io since the other end of this pipe has been
|
||||||
|
// closed in reap().
|
||||||
|
fd = epoller.poll()?;
|
||||||
|
}
|
||||||
|
|
||||||
if fd == -1 {
|
if fd == -1 {
|
||||||
return Err(anyhow!(nix::Error::from_errno(nix::errno::Errno::EINVAL)));
|
return Err(anyhow!(nix::Error::from_errno(nix::errno::Errno::EINVAL)));
|
||||||
}
|
}
|
||||||
@ -808,6 +825,8 @@ impl protocols::agent_ttrpc::AgentService for agentService {
|
|||||||
p.parent_stdin = None;
|
p.parent_stdin = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.close_epoller();
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user