diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 7eb2f0bdc5..9118d2665f 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -267,16 +267,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "epoll" -version = "4.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20df693c700404f7e19d4d6fae6b15215d2913c27955d2b9d6f2c0f537511cd0" -dependencies = [ - "bitflags", - "libc", -] - [[package]] name = "errno" version = "0.2.6" @@ -1305,7 +1295,6 @@ dependencies = [ "caps", "cgroups-rs", "dirs", - "epoll", "futures", "inotify", "lazy_static", diff --git a/src/agent/rustjail/Cargo.toml b/src/agent/rustjail/Cargo.toml index af8623b044..e191785470 100644 --- a/src/agent/rustjail/Cargo.toml +++ b/src/agent/rustjail/Cargo.toml @@ -26,7 +26,6 @@ dirs = "3.0.1" anyhow = "1.0.32" cgroups = { package = "cgroups-rs", version = "0.2.1" } tempfile = "3.1.0" -epoll = "4.3.1" tokio = { version = "0.2", features = ["sync", "io-util", "process", "time", "macros"] } futures = "0.3" diff --git a/src/agent/rustjail/src/lib.rs b/src/agent/rustjail/src/lib.rs index 59b5b26721..e29ce4b98a 100644 --- a/src/agent/rustjail/src/lib.rs +++ b/src/agent/rustjail/src/lib.rs @@ -42,7 +42,6 @@ pub mod container; pub mod mount; pub mod pipestream; pub mod process; -pub mod reaper; pub mod specconv; pub mod sync; pub mod sync_with_async; diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 2e1d6ebbd2..56ed1c0417 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -14,7 +14,6 @@ use nix::sys::wait::{self, WaitStatus}; use nix::unistd::{self, Pid}; use nix::Result; -use crate::reaper::Epoller; use oci::Process as OCIProcess; use slog::Logger; @@ -23,6 +22,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::io::{split, ReadHalf, WriteHalf}; use tokio::sync::Mutex; +use tokio::sync::Notify; #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub enum StreamType { @@ -62,7 +62,7 @@ pub struct Process { pub exit_watchers: Vec>, pub oci: OCIProcess, pub logger: Logger, - pub epoller: Option, + pub term_exit_notifier: Arc, readers: HashMap, writers: HashMap, @@ -117,7 +117,7 @@ impl Process { exit_watchers: Vec::new(), oci: ocip.clone(), logger: logger.clone(), - epoller: None, + term_exit_notifier: Arc::new(Notify::new()), readers: HashMap::new(), writers: HashMap::new(), }; @@ -142,27 +142,9 @@ impl Process { Ok(p) } - pub fn close_epoller(&mut self) { - if let Some(epoller) = self.epoller.take() { - epoller.close(); - } - } - - pub fn create_epoller(&mut self) -> anyhow::Result<()> { - match self.term_master { - Some(term_master) => { - // add epoller to process - let epoller = Epoller::new(&self.logger, term_master)?; - self.epoller = Some(epoller) - } - None => { - info!( - self.logger, - "try to add epoller to a process without a term master fd" - ); - } - } - Ok(()) + pub fn notify_term_close(&mut self) { + let notify = self.term_exit_notifier.clone(); + notify.notify(); } fn get_fd(&self, stream_type: &StreamType) -> Option { @@ -216,7 +198,6 @@ impl Process { } } - fn create_extended_pipe(flags: OFlag, pipe_size: i32) -> Result<(RawFd, RawFd)> { let (r, w) = unistd::pipe2(flags)?; if pipe_size > 0 { diff --git a/src/agent/rustjail/src/reaper.rs b/src/agent/rustjail/src/reaper.rs deleted file mode 100644 index 91b4d0e066..0000000000 --- a/src/agent/rustjail/src/reaper.rs +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright (c) 2020 Ant Group -// -// SPDX-License-Identifier: Apache-2.0 -// - -use nix::fcntl::OFlag; -use slog::Logger; - -use nix::unistd; -use std::os::unix::io::RawFd; - -use anyhow::Result; - -const MAX_EVENTS: usize = 2; - -#[derive(Debug, Clone)] -pub struct Epoller { - logger: Logger, - epoll_fd: RawFd, - // rfd and wfd are a pipe's files two ends, this pipe is - // used to sync between the readStdio and the process exits. - // once the process exits, it will close one end to notify - // the readStdio that the process has exited and it should not - // wait on the process's terminal which has been inherited - // by it's children and hasn't exited. - rfd: RawFd, - wfd: RawFd, -} - -impl Epoller { - pub fn new(logger: &Logger, fd: RawFd) -> Result { - let epoll_fd = epoll::create(true)?; - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; - - let mut epoller = Self { - logger: logger.clone(), - epoll_fd, - rfd, - wfd, - }; - - epoller.add(rfd)?; - epoller.add(fd)?; - - Ok(epoller) - } - - pub fn close_wfd(&self) { - let _ = unistd::close(self.wfd); - } - - pub fn close(&self) { - let _ = unistd::close(self.rfd); - let _ = unistd::close(self.wfd); - let _ = unistd::close(self.epoll_fd); - } - - fn add(&mut self, fd: RawFd) -> Result<()> { - info!(self.logger, "Epoller add fd {}", fd); - // add creates an epoll which is used to monitor the process's pty's master and - // one end of its exit notify pipe. Those files will be registered with level-triggered - // notification. - epoll::ctl( - self.epoll_fd, - epoll::ControlOptions::EPOLL_CTL_ADD, - fd, - epoll::Event::new( - epoll::Events::EPOLLHUP - | epoll::Events::EPOLLIN - | epoll::Events::EPOLLERR - | epoll::Events::EPOLLRDHUP, - fd as u64, - ), - )?; - - Ok(()) - } - - // There will be three cases on the epoller once it poll: - // a: only pty's master get an event(other than self.rfd); - // b: only the pipe get an event(self.rfd); - // c: both of pty and pipe have event occur; - // for case a, it means there is output in process's terminal and what needed to do is - // just read the terminal and send them out; for case b, it means the process has exited - // and there is no data in the terminal, thus just return the "EOF" to end the io; - // for case c, it means the process has exited but there is some data in the terminal which - // hasn't been send out, thus it should send those data out first and then send "EOF" last to - // end the io. - pub fn poll(&self) -> Result { - let mut rfd = self.rfd; - let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); MAX_EVENTS]; - - loop { - let event_count = match epoll::wait(self.epoll_fd, -1, epoll_events.as_mut_slice()) { - Ok(ec) => ec, - Err(e) => { - info!(self.logger, "loop wait err {:?}", e); - // EINTR: The call was interrupted by a signal handler before either - // any of the requested events occurred or the timeout expired - if e.kind() == std::io::ErrorKind::Interrupted { - continue; - } - return Err(e.into()); - } - }; - - for event in epoll_events.iter().take(event_count) { - let fd = event.data as i32; - // fd has been assigned with one end of process's exited pipe by default, and - // here to check is there any event occur on process's terminal, if "yes", it - // should be dealt first, otherwise, it means the process has exited and there - // is nothing left in the process's terminal needed to be read. - if fd != rfd { - rfd = fd; - break; - } - } - break; - } - - Ok(rfd) - } -} - -#[cfg(test)] -mod tests { - use super::Epoller; - use nix::fcntl::OFlag; - use nix::unistd; - use std::thread; - - #[test] - fn test_epoller_poll() { - let logger = slog::Logger::root(slog::Discard, o!()); - let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap(); - let epoller = Epoller::new(&logger, rfd).unwrap(); - - let child = thread::spawn(move || { - let _ = unistd::write(wfd, "temporary file's content".as_bytes()); - }); - - // wait write to finish - let _ = child.join(); - - let fd = epoller.poll().unwrap(); - assert_eq!(fd, rfd, "Should get rfd"); - - epoller.close(); - } -} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index b75f90e07f..dba964022b 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -383,12 +383,10 @@ async fn setup_signal_handler(logger: &Logger, sandbox: Arc>) -> p.exit_code = ret; let _ = unistd::close(pipe_write); - if let Some(ref poller) = p.epoller { - info!(logger, "close epoller"); - // close the socket file to notify readStdio to close terminal specifically - // in case this process's terminal has been inherited by its children. - poller.close_wfd() - } + info!(logger, "notify term to close"); + // close the socket file to notify readStdio to close terminal specifically + // in case this process's terminal has been inherited by its children. + p.notify_term_close(); } } } diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 2efd77f9f7..7d9a657ade 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -201,10 +201,6 @@ impl agentService { } } - // set epoller - let p = find_process(&mut s, cid.as_str(), "", true)?; - p.create_epoller()?; - Ok(()) } @@ -312,10 +308,6 @@ impl agentService { ctr.run(p).await?; - // set epoller - let p = find_process(&mut sandbox, cid.as_str(), exec_id.as_str(), false)?; - p.create_epoller()?; - Ok(()) } @@ -459,9 +451,7 @@ impl agentService { let cid = req.container_id; let eid = req.exec_id; - // let mut fd: RawFd = -1; - // let mut epoller: Option = None; - + let mut term_exit_notifier = Arc::new(tokio::sync::Notify::new()); let reader = { let s = self.sandbox.clone(); let mut sandbox = s.lock().await; @@ -469,7 +459,7 @@ impl agentService { let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?; if p.term_master.is_some() { - // epoller = p.epoller.clone(); + term_exit_notifier = p.term_exit_notifier.clone(); p.get_reader(StreamType::TermMaster) } else if stdout { if p.parent_stdout.is_some() { @@ -487,12 +477,19 @@ impl agentService { } let reader = reader.unwrap(); - let vector = read_stream(reader, req.len as usize).await?; - let mut resp = ReadStreamResponse::new(); - resp.set_data(vector); + tokio::select! { + _ = term_exit_notifier.notified() => { + Err(anyhow!("eof")) + } + v = read_stream(reader, req.len as usize) => { + let vector = v?; + let mut resp = ReadStreamResponse::new(); + resp.set_data(vector); - Ok(resp) + Ok(resp) + } + } } } @@ -805,7 +802,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { p.parent_stdin = None; } - p.close_epoller(); + p.notify_term_close(); Ok(Empty::new()) } @@ -1317,7 +1314,7 @@ async fn read_stream(reader: Arc>>, l: usize) -> Resu content.resize(len, 0); if len == 0 { - return Err(anyhow!("read meet eof")); + return Err(anyhow!("read meet eof")); } Ok(content) @@ -1627,7 +1624,7 @@ fn cleanup_process(p: &mut Process) -> Result<()> { let _ = unistd::close(p.exit_pipe_r.unwrap())?; } - p.close_epoller(); + p.notify_term_close(); p.parent_stdin = None; p.parent_stdout = None;