diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index a8cf99d9e5..efe2c26fca 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -248,6 +248,16 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "epoll" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20df693c700404f7e19d4d6fae6b15215d2913c27955d2b9d6f2c0f537511cd0" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "errno" version = "0.2.6" @@ -921,6 +931,7 @@ dependencies = [ "caps", "cgroups", "dirs", + "epoll", "lazy_static", "libc", "nix 0.17.0", diff --git a/src/agent/rustjail/Cargo.toml b/src/agent/rustjail/Cargo.toml index 72b9e9bf69..b4afbd16cd 100644 --- a/src/agent/rustjail/Cargo.toml +++ b/src/agent/rustjail/Cargo.toml @@ -26,6 +26,7 @@ dirs = "3.0.1" anyhow = "1.0.32" cgroups = { git = "https://github.com/kata-containers/cgroups-rs", branch = "stable-0.1.1"} tempfile = "3.1.0" +epoll = "4.3.1" [dev-dependencies] serial_test = "0.5.0" diff --git a/src/agent/rustjail/src/lib.rs b/src/agent/rustjail/src/lib.rs index 63dc77046f..50f82c646e 100644 --- a/src/agent/rustjail/src/lib.rs +++ b/src/agent/rustjail/src/lib.rs @@ -41,6 +41,7 @@ pub mod cgroups; pub mod container; pub mod mount; pub mod process; +pub mod reaper; pub mod specconv; pub mod sync; pub mod validator; diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index daba20e5f5..8f2dd9c131 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -14,6 +14,7 @@ use nix::sys::wait::{self, WaitStatus}; use nix::unistd::{self, Pid}; use nix::Result; +use crate::reaper::Epoller; use oci::Process as OCIProcess; use slog::Logger; @@ -40,6 +41,7 @@ pub struct Process { pub exit_watchers: Vec>, pub oci: OCIProcess, pub logger: Logger, + pub epoller: Option, } pub trait ProcessOperations { @@ -91,6 +93,7 @@ impl Process { exit_watchers: Vec::new(), oci: ocip.clone(), logger: logger.clone(), + epoller: None, }; info!(logger, "before create console socket!"); @@ -112,6 +115,29 @@ impl Process { } Ok(p) } + + pub fn close_epoller(&mut self) { + if let Some(epoller) = self.epoller.take() { + epoller.close(); + } + } + + pub fn create_epoller(&mut self) -> anyhow::Result<()> { + match self.term_master { + Some(term_master) => { + // add epoller to process + let epoller = Epoller::new(&self.logger, term_master)?; + self.epoller = Some(epoller) + } + None => { + info!( + self.logger, + "try to add epoller to a process without a term master fd" + ); + } + } + Ok(()) + } } fn create_extended_pipe(flags: OFlag, pipe_size: i32) -> Result<(RawFd, RawFd)> { diff --git a/src/agent/rustjail/src/reaper.rs b/src/agent/rustjail/src/reaper.rs new file mode 100644 index 0000000000..91b4d0e066 --- /dev/null +++ b/src/agent/rustjail/src/reaper.rs @@ -0,0 +1,150 @@ +// Copyright (c) 2020 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use nix::fcntl::OFlag; +use slog::Logger; + +use nix::unistd; +use std::os::unix::io::RawFd; + +use anyhow::Result; + +const MAX_EVENTS: usize = 2; + +#[derive(Debug, Clone)] +pub struct Epoller { + logger: Logger, + epoll_fd: RawFd, + // rfd and wfd are a pipe's files two ends, this pipe is + // used to sync between the readStdio and the process exits. + // once the process exits, it will close one end to notify + // the readStdio that the process has exited and it should not + // wait on the process's terminal which has been inherited + // by it's children and hasn't exited. + rfd: RawFd, + wfd: RawFd, +} + +impl Epoller { + pub fn new(logger: &Logger, fd: RawFd) -> Result { + let epoll_fd = epoll::create(true)?; + let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; + + let mut epoller = Self { + logger: logger.clone(), + epoll_fd, + rfd, + wfd, + }; + + epoller.add(rfd)?; + epoller.add(fd)?; + + Ok(epoller) + } + + pub fn close_wfd(&self) { + let _ = unistd::close(self.wfd); + } + + pub fn close(&self) { + let _ = unistd::close(self.rfd); + let _ = unistd::close(self.wfd); + let _ = unistd::close(self.epoll_fd); + } + + fn add(&mut self, fd: RawFd) -> Result<()> { + info!(self.logger, "Epoller add fd {}", fd); + // add creates an epoll which is used to monitor the process's pty's master and + // one end of its exit notify pipe. Those files will be registered with level-triggered + // notification. + epoll::ctl( + self.epoll_fd, + epoll::ControlOptions::EPOLL_CTL_ADD, + fd, + epoll::Event::new( + epoll::Events::EPOLLHUP + | epoll::Events::EPOLLIN + | epoll::Events::EPOLLERR + | epoll::Events::EPOLLRDHUP, + fd as u64, + ), + )?; + + Ok(()) + } + + // There will be three cases on the epoller once it poll: + // a: only pty's master get an event(other than self.rfd); + // b: only the pipe get an event(self.rfd); + // c: both of pty and pipe have event occur; + // for case a, it means there is output in process's terminal and what needed to do is + // just read the terminal and send them out; for case b, it means the process has exited + // and there is no data in the terminal, thus just return the "EOF" to end the io; + // for case c, it means the process has exited but there is some data in the terminal which + // hasn't been send out, thus it should send those data out first and then send "EOF" last to + // end the io. + pub fn poll(&self) -> Result { + let mut rfd = self.rfd; + let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); MAX_EVENTS]; + + loop { + let event_count = match epoll::wait(self.epoll_fd, -1, epoll_events.as_mut_slice()) { + Ok(ec) => ec, + Err(e) => { + info!(self.logger, "loop wait err {:?}", e); + // EINTR: The call was interrupted by a signal handler before either + // any of the requested events occurred or the timeout expired + if e.kind() == std::io::ErrorKind::Interrupted { + continue; + } + return Err(e.into()); + } + }; + + for event in epoll_events.iter().take(event_count) { + let fd = event.data as i32; + // fd has been assigned with one end of process's exited pipe by default, and + // here to check is there any event occur on process's terminal, if "yes", it + // should be dealt first, otherwise, it means the process has exited and there + // is nothing left in the process's terminal needed to be read. + if fd != rfd { + rfd = fd; + break; + } + } + break; + } + + Ok(rfd) + } +} + +#[cfg(test)] +mod tests { + use super::Epoller; + use nix::fcntl::OFlag; + use nix::unistd; + use std::thread; + + #[test] + fn test_epoller_poll() { + let logger = slog::Logger::root(slog::Discard, o!()); + let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap(); + let epoller = Epoller::new(&logger, rfd).unwrap(); + + let child = thread::spawn(move || { + let _ = unistd::write(wfd, "temporary file's content".as_bytes()); + }); + + // wait write to finish + let _ = child.join(); + + let fd = epoller.poll().unwrap(); + assert_eq!(fd, rfd, "Should get rfd"); + + epoller.close(); + } +} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index dddb083ba6..6bd2470287 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -306,6 +306,7 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc>) -> Result continue 'outer; } }; + info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status)); let pid = wait_status.pid(); if let Some(pid) = pid { @@ -342,6 +343,13 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc>) -> Result p.exit_code = ret; let _ = unistd::close(pipe_write); + + if let Some(ref poller) = p.epoller { + info!(logger, "close epoller"); + // close the socket file to notify readStdio to close terminal specifically + // in case this process's terminal has been inherited by its children. + poller.close_wfd() + } } } } diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 03af6d8903..03d45bdfa2 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -24,6 +24,7 @@ use protocols::types::Interface; use rustjail::cgroups::notifier; use rustjail::container::{BaseContainer, Container, LinuxContainer}; use rustjail::process::Process; +use rustjail::reaper; use rustjail::specconv::CreateOpts; use nix::errno::Errno; @@ -189,10 +190,14 @@ impl agentService { let cg_path = ctr.cgroup_manager.as_ref().unwrap().get_cg_path("memory"); if cg_path.is_some() { let rx = notifier::notify_oom(cid.as_str(), cg_path.unwrap())?; - s.run_oom_event_monitor(rx, cid); + s.run_oom_event_monitor(rx, cid.clone()); } } + // set epoller + let p = find_process(&mut s, cid.as_str(), "", true)?; + p.create_epoller()?; + Ok(()) } @@ -272,7 +277,7 @@ impl agentService { let cid = req.container_id.clone(); let exec_id = req.exec_id.clone(); - info!(sl!(), "cid: {} eid: {}", cid, exec_id); + info!(sl!(), "do_exec_process cid: {} eid: {}", cid, exec_id); let s = self.sandbox.clone(); let mut sandbox = s.lock().unwrap(); @@ -293,6 +298,10 @@ impl agentService { ctr.run(p)?; + // set epoller + let p = find_process(&mut sandbox, cid.as_str(), exec_id.as_str(), false)?; + p.create_epoller()?; + Ok(()) } @@ -403,6 +412,8 @@ impl agentService { let _ = unistd::close(p.exit_pipe_r.unwrap()); } + p.close_epoller(); + p.parent_stdin = None; p.parent_stdout = None; p.parent_stderr = None; @@ -427,13 +438,6 @@ impl agentService { let cid = req.container_id.clone(); let eid = req.exec_id.clone(); - info!( - sl!(), - "write stdin"; - "container-id" => cid.clone(), - "exec-id" => eid.clone() - ); - let s = self.sandbox.clone(); let mut sandbox = s.lock().unwrap(); let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?; @@ -477,6 +481,7 @@ impl agentService { let eid = req.exec_id; let mut fd: RawFd = -1; + let mut epoller: Option = None; { let s = self.sandbox.clone(); let mut sandbox = s.lock().unwrap(); @@ -485,6 +490,7 @@ impl agentService { if p.term_master.is_some() { fd = p.term_master.unwrap(); + epoller = p.epoller.clone(); } else if stdout { if p.parent_stdout.is_some() { fd = p.parent_stdout.unwrap(); @@ -494,6 +500,17 @@ impl agentService { } } + if let Some(epoller) = epoller { + // The process's epoller's poll() will return a file descriptor of the process's + // terminal or one end of its exited pipe. If it returns its terminal, it means + // there is data needed to be read out or it has been closed; if it returns the + // process's exited pipe, it means the process has exited and there is no data + // needed to be read out in its terminal, thus following read on it will read out + // "EOF" to terminate this process's io since the other end of this pipe has been + // closed in reap(). + fd = epoller.poll()?; + } + if fd == -1 { return Err(anyhow!(nix::Error::from_errno(nix::errno::Errno::EINVAL))); } @@ -808,6 +825,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { p.parent_stdin = None; } + p.close_epoller(); + Ok(Empty::new()) }