mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
agent: use tokio Notify instead of epoll to fix #1160
Fixes: #1160 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
parent
332fa4c65f
commit
9f79ddb9df
11
src/agent/Cargo.lock
generated
11
src/agent/Cargo.lock
generated
@ -267,16 +267,6 @@ version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
||||
|
||||
[[package]]
|
||||
name = "epoll"
|
||||
version = "4.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20df693c700404f7e19d4d6fae6b15215d2913c27955d2b9d6f2c0f537511cd0"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "errno"
|
||||
version = "0.2.6"
|
||||
@ -1305,7 +1295,6 @@ dependencies = [
|
||||
"caps",
|
||||
"cgroups-rs",
|
||||
"dirs",
|
||||
"epoll",
|
||||
"futures",
|
||||
"inotify",
|
||||
"lazy_static",
|
||||
|
@ -26,7 +26,6 @@ dirs = "3.0.1"
|
||||
anyhow = "1.0.32"
|
||||
cgroups = { package = "cgroups-rs", version = "0.2.1" }
|
||||
tempfile = "3.1.0"
|
||||
epoll = "4.3.1"
|
||||
|
||||
tokio = { version = "0.2", features = ["sync", "io-util", "process", "time", "macros"] }
|
||||
futures = "0.3"
|
||||
|
@ -42,7 +42,6 @@ pub mod container;
|
||||
pub mod mount;
|
||||
pub mod pipestream;
|
||||
pub mod process;
|
||||
pub mod reaper;
|
||||
pub mod specconv;
|
||||
pub mod sync;
|
||||
pub mod sync_with_async;
|
||||
|
@ -14,7 +14,6 @@ use nix::sys::wait::{self, WaitStatus};
|
||||
use nix::unistd::{self, Pid};
|
||||
use nix::Result;
|
||||
|
||||
use crate::reaper::Epoller;
|
||||
use oci::Process as OCIProcess;
|
||||
use slog::Logger;
|
||||
|
||||
@ -23,6 +22,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{split, ReadHalf, WriteHalf};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
pub enum StreamType {
|
||||
@ -62,7 +62,7 @@ pub struct Process {
|
||||
pub exit_watchers: Vec<Sender<i32>>,
|
||||
pub oci: OCIProcess,
|
||||
pub logger: Logger,
|
||||
pub epoller: Option<Epoller>,
|
||||
pub term_exit_notifier: Arc<Notify>,
|
||||
|
||||
readers: HashMap<StreamType, Reader>,
|
||||
writers: HashMap<StreamType, Writer>,
|
||||
@ -117,7 +117,7 @@ impl Process {
|
||||
exit_watchers: Vec::new(),
|
||||
oci: ocip.clone(),
|
||||
logger: logger.clone(),
|
||||
epoller: None,
|
||||
term_exit_notifier: Arc::new(Notify::new()),
|
||||
readers: HashMap::new(),
|
||||
writers: HashMap::new(),
|
||||
};
|
||||
@ -142,27 +142,9 @@ impl Process {
|
||||
Ok(p)
|
||||
}
|
||||
|
||||
pub fn close_epoller(&mut self) {
|
||||
if let Some(epoller) = self.epoller.take() {
|
||||
epoller.close();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_epoller(&mut self) -> anyhow::Result<()> {
|
||||
match self.term_master {
|
||||
Some(term_master) => {
|
||||
// add epoller to process
|
||||
let epoller = Epoller::new(&self.logger, term_master)?;
|
||||
self.epoller = Some(epoller)
|
||||
}
|
||||
None => {
|
||||
info!(
|
||||
self.logger,
|
||||
"try to add epoller to a process without a term master fd"
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
pub fn notify_term_close(&mut self) {
|
||||
let notify = self.term_exit_notifier.clone();
|
||||
notify.notify();
|
||||
}
|
||||
|
||||
fn get_fd(&self, stream_type: &StreamType) -> Option<RawFd> {
|
||||
@ -216,7 +198,6 @@ impl Process {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn create_extended_pipe(flags: OFlag, pipe_size: i32) -> Result<(RawFd, RawFd)> {
|
||||
let (r, w) = unistd::pipe2(flags)?;
|
||||
if pipe_size > 0 {
|
||||
|
@ -1,150 +0,0 @@
|
||||
// Copyright (c) 2020 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use nix::fcntl::OFlag;
|
||||
use slog::Logger;
|
||||
|
||||
use nix::unistd;
|
||||
use std::os::unix::io::RawFd;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
const MAX_EVENTS: usize = 2;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Epoller {
|
||||
logger: Logger,
|
||||
epoll_fd: RawFd,
|
||||
// rfd and wfd are a pipe's files two ends, this pipe is
|
||||
// used to sync between the readStdio and the process exits.
|
||||
// once the process exits, it will close one end to notify
|
||||
// the readStdio that the process has exited and it should not
|
||||
// wait on the process's terminal which has been inherited
|
||||
// by it's children and hasn't exited.
|
||||
rfd: RawFd,
|
||||
wfd: RawFd,
|
||||
}
|
||||
|
||||
impl Epoller {
|
||||
pub fn new(logger: &Logger, fd: RawFd) -> Result<Epoller> {
|
||||
let epoll_fd = epoll::create(true)?;
|
||||
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
|
||||
|
||||
let mut epoller = Self {
|
||||
logger: logger.clone(),
|
||||
epoll_fd,
|
||||
rfd,
|
||||
wfd,
|
||||
};
|
||||
|
||||
epoller.add(rfd)?;
|
||||
epoller.add(fd)?;
|
||||
|
||||
Ok(epoller)
|
||||
}
|
||||
|
||||
pub fn close_wfd(&self) {
|
||||
let _ = unistd::close(self.wfd);
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
let _ = unistd::close(self.rfd);
|
||||
let _ = unistd::close(self.wfd);
|
||||
let _ = unistd::close(self.epoll_fd);
|
||||
}
|
||||
|
||||
fn add(&mut self, fd: RawFd) -> Result<()> {
|
||||
info!(self.logger, "Epoller add fd {}", fd);
|
||||
// add creates an epoll which is used to monitor the process's pty's master and
|
||||
// one end of its exit notify pipe. Those files will be registered with level-triggered
|
||||
// notification.
|
||||
epoll::ctl(
|
||||
self.epoll_fd,
|
||||
epoll::ControlOptions::EPOLL_CTL_ADD,
|
||||
fd,
|
||||
epoll::Event::new(
|
||||
epoll::Events::EPOLLHUP
|
||||
| epoll::Events::EPOLLIN
|
||||
| epoll::Events::EPOLLERR
|
||||
| epoll::Events::EPOLLRDHUP,
|
||||
fd as u64,
|
||||
),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// There will be three cases on the epoller once it poll:
|
||||
// a: only pty's master get an event(other than self.rfd);
|
||||
// b: only the pipe get an event(self.rfd);
|
||||
// c: both of pty and pipe have event occur;
|
||||
// for case a, it means there is output in process's terminal and what needed to do is
|
||||
// just read the terminal and send them out; for case b, it means the process has exited
|
||||
// and there is no data in the terminal, thus just return the "EOF" to end the io;
|
||||
// for case c, it means the process has exited but there is some data in the terminal which
|
||||
// hasn't been send out, thus it should send those data out first and then send "EOF" last to
|
||||
// end the io.
|
||||
pub fn poll(&self) -> Result<RawFd> {
|
||||
let mut rfd = self.rfd;
|
||||
let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); MAX_EVENTS];
|
||||
|
||||
loop {
|
||||
let event_count = match epoll::wait(self.epoll_fd, -1, epoll_events.as_mut_slice()) {
|
||||
Ok(ec) => ec,
|
||||
Err(e) => {
|
||||
info!(self.logger, "loop wait err {:?}", e);
|
||||
// EINTR: The call was interrupted by a signal handler before either
|
||||
// any of the requested events occurred or the timeout expired
|
||||
if e.kind() == std::io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
for event in epoll_events.iter().take(event_count) {
|
||||
let fd = event.data as i32;
|
||||
// fd has been assigned with one end of process's exited pipe by default, and
|
||||
// here to check is there any event occur on process's terminal, if "yes", it
|
||||
// should be dealt first, otherwise, it means the process has exited and there
|
||||
// is nothing left in the process's terminal needed to be read.
|
||||
if fd != rfd {
|
||||
rfd = fd;
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
Ok(rfd)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Epoller;
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::unistd;
|
||||
use std::thread;
|
||||
|
||||
#[test]
|
||||
fn test_epoller_poll() {
|
||||
let logger = slog::Logger::root(slog::Discard, o!());
|
||||
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap();
|
||||
let epoller = Epoller::new(&logger, rfd).unwrap();
|
||||
|
||||
let child = thread::spawn(move || {
|
||||
let _ = unistd::write(wfd, "temporary file's content".as_bytes());
|
||||
});
|
||||
|
||||
// wait write to finish
|
||||
let _ = child.join();
|
||||
|
||||
let fd = epoller.poll().unwrap();
|
||||
assert_eq!(fd, rfd, "Should get rfd");
|
||||
|
||||
epoller.close();
|
||||
}
|
||||
}
|
@ -383,12 +383,10 @@ async fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) ->
|
||||
p.exit_code = ret;
|
||||
let _ = unistd::close(pipe_write);
|
||||
|
||||
if let Some(ref poller) = p.epoller {
|
||||
info!(logger, "close epoller");
|
||||
// close the socket file to notify readStdio to close terminal specifically
|
||||
// in case this process's terminal has been inherited by its children.
|
||||
poller.close_wfd()
|
||||
}
|
||||
info!(logger, "notify term to close");
|
||||
// close the socket file to notify readStdio to close terminal specifically
|
||||
// in case this process's terminal has been inherited by its children.
|
||||
p.notify_term_close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -201,10 +201,6 @@ impl agentService {
|
||||
}
|
||||
}
|
||||
|
||||
// set epoller
|
||||
let p = find_process(&mut s, cid.as_str(), "", true)?;
|
||||
p.create_epoller()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -312,10 +308,6 @@ impl agentService {
|
||||
|
||||
ctr.run(p).await?;
|
||||
|
||||
// set epoller
|
||||
let p = find_process(&mut sandbox, cid.as_str(), exec_id.as_str(), false)?;
|
||||
p.create_epoller()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -459,9 +451,7 @@ impl agentService {
|
||||
let cid = req.container_id;
|
||||
let eid = req.exec_id;
|
||||
|
||||
// let mut fd: RawFd = -1;
|
||||
// let mut epoller: Option<reaper::Epoller> = None;
|
||||
|
||||
let mut term_exit_notifier = Arc::new(tokio::sync::Notify::new());
|
||||
let reader = {
|
||||
let s = self.sandbox.clone();
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -469,7 +459,7 @@ impl agentService {
|
||||
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
|
||||
|
||||
if p.term_master.is_some() {
|
||||
// epoller = p.epoller.clone();
|
||||
term_exit_notifier = p.term_exit_notifier.clone();
|
||||
p.get_reader(StreamType::TermMaster)
|
||||
} else if stdout {
|
||||
if p.parent_stdout.is_some() {
|
||||
@ -487,12 +477,19 @@ impl agentService {
|
||||
}
|
||||
|
||||
let reader = reader.unwrap();
|
||||
let vector = read_stream(reader, req.len as usize).await?;
|
||||
|
||||
let mut resp = ReadStreamResponse::new();
|
||||
resp.set_data(vector);
|
||||
tokio::select! {
|
||||
_ = term_exit_notifier.notified() => {
|
||||
Err(anyhow!("eof"))
|
||||
}
|
||||
v = read_stream(reader, req.len as usize) => {
|
||||
let vector = v?;
|
||||
let mut resp = ReadStreamResponse::new();
|
||||
resp.set_data(vector);
|
||||
|
||||
Ok(resp)
|
||||
Ok(resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -805,7 +802,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
|
||||
p.parent_stdin = None;
|
||||
}
|
||||
|
||||
p.close_epoller();
|
||||
p.notify_term_close();
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1317,7 +1314,7 @@ async fn read_stream(reader: Arc<Mutex<ReadHalf<PipeStream>>>, l: usize) -> Resu
|
||||
content.resize(len, 0);
|
||||
|
||||
if len == 0 {
|
||||
return Err(anyhow!("read meet eof"));
|
||||
return Err(anyhow!("read meet eof"));
|
||||
}
|
||||
|
||||
Ok(content)
|
||||
@ -1627,7 +1624,7 @@ fn cleanup_process(p: &mut Process) -> Result<()> {
|
||||
let _ = unistd::close(p.exit_pipe_r.unwrap())?;
|
||||
}
|
||||
|
||||
p.close_epoller();
|
||||
p.notify_term_close();
|
||||
|
||||
p.parent_stdin = None;
|
||||
p.parent_stdout = None;
|
||||
|
Loading…
Reference in New Issue
Block a user