agent: use channel instead of pipe to send exit signal of process

The situation is not a IPC scene, pipe(2) is too heavy.

We have tokio::sync::channel after tokio has been introduced.
The channel has better performance and easy to use.

Fixes: #1721

Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
Tim Zhang 2021-04-21 16:39:13 +08:00
parent 0f2fe4a418
commit 8ecf8e5c1f
4 changed files with 14 additions and 43 deletions

View File

@ -908,7 +908,7 @@ impl BaseContainer for LinuxContainer {
child = child.env(PIDNS_FD, format!("{}", pidns.unwrap())); child = child.env(PIDNS_FD, format!("{}", pidns.unwrap()));
} }
let child = child.spawn()?; child.spawn()?;
unistd::close(crfd)?; unistd::close(crfd)?;
unistd::close(cwfd)?; unistd::close(cwfd)?;
@ -964,19 +964,6 @@ impl BaseContainer for LinuxContainer {
self.created = SystemTime::now(); self.created = SystemTime::now();
// create the pipes for notify process exited
let (exit_pipe_r, exit_pipe_w) = unistd::pipe2(OFlag::O_CLOEXEC)
.context("failed to create pipe")
.map_err(|e| {
let _ = signal::kill(Pid::from_raw(child.id() as i32), Some(Signal::SIGKILL))
.map_err(|e| warn!(logger, "signal::kill creating pipe {:?}", e));
e
})?;
p.exit_pipe_w = Some(exit_pipe_w);
p.exit_pipe_r = Some(exit_pipe_r);
if p.init { if p.init {
let spec = self.config.spec.as_mut().unwrap(); let spec = self.config.spec.as_mut().unwrap();
update_namespaces(&self.logger, spec, p.pid)?; update_namespaces(&self.logger, spec, p.pid)?;

View File

@ -29,7 +29,6 @@ pub enum StreamType {
Stdin, Stdin,
Stdout, Stdout,
Stderr, Stderr,
ExitPipeR,
TermMaster, TermMaster,
ParentStdin, ParentStdin,
ParentStdout, ParentStdout,
@ -45,8 +44,8 @@ pub struct Process {
pub stdin: Option<RawFd>, pub stdin: Option<RawFd>,
pub stdout: Option<RawFd>, pub stdout: Option<RawFd>,
pub stderr: Option<RawFd>, pub stderr: Option<RawFd>,
pub exit_pipe_r: Option<RawFd>, pub exit_tx: Option<tokio::sync::watch::Sender<bool>>,
pub exit_pipe_w: Option<RawFd>, pub exit_rx: Option<tokio::sync::watch::Receiver<bool>>,
pub extra_files: Vec<File>, pub extra_files: Vec<File>,
pub term_master: Option<RawFd>, pub term_master: Option<RawFd>,
pub tty: bool, pub tty: bool,
@ -97,14 +96,15 @@ impl Process {
pipe_size: i32, pipe_size: i32,
) -> Result<Self> { ) -> Result<Self> {
let logger = logger.new(o!("subsystem" => "process")); let logger = logger.new(o!("subsystem" => "process"));
let (exit_tx, exit_rx) = tokio::sync::watch::channel(false);
let mut p = Process { let mut p = Process {
exec_id: String::from(id), exec_id: String::from(id),
stdin: None, stdin: None,
stdout: None, stdout: None,
stderr: None, stderr: None,
exit_pipe_w: None, exit_tx: Some(exit_tx),
exit_pipe_r: None, exit_rx: Some(exit_rx),
extra_files: Vec::new(), extra_files: Vec::new(),
tty: ocip.terminal, tty: ocip.terminal,
term_master: None, term_master: None,
@ -152,7 +152,6 @@ impl Process {
StreamType::Stdin => self.stdin, StreamType::Stdin => self.stdin,
StreamType::Stdout => self.stdout, StreamType::Stdout => self.stdout,
StreamType::Stderr => self.stderr, StreamType::Stderr => self.stderr,
StreamType::ExitPipeR => self.exit_pipe_r,
StreamType::TermMaster => self.term_master, StreamType::TermMaster => self.term_master,
StreamType::ParentStdin => self.parent_stdin, StreamType::ParentStdin => self.parent_stdin,
StreamType::ParentStdout => self.parent_stdout, StreamType::ParentStdout => self.parent_stdout,

View File

@ -369,7 +369,6 @@ impl AgentService {
let s = self.sandbox.clone(); let s = self.sandbox.clone();
let mut resp = WaitProcessResponse::new(); let mut resp = WaitProcessResponse::new();
let pid: pid_t; let pid: pid_t;
let stream;
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100); let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
@ -380,22 +379,20 @@ impl AgentService {
"exec-id" => eid.clone() "exec-id" => eid.clone()
); );
{ let exit_rx = {
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?; let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
stream = p.get_reader(StreamType::ExitPipeR);
p.exit_watchers.push(exit_send); p.exit_watchers.push(exit_send);
pid = p.pid; pid = p.pid;
}
if stream.is_some() { p.exit_rx.clone()
info!(sl!(), "reading exit pipe"); };
let reader = stream.unwrap(); if let Some(mut exit_rx) = exit_rx {
let mut content: Vec<u8> = vec![0, 1]; info!(sl!(), "cid {} eid {} waiting for exit signal", &cid, &eid);
let _ = reader.lock().await.read(&mut content).await; while exit_rx.changed().await.is_ok() {}
info!(sl!(), "cid {} eid {} received exit signal", &cid, &eid);
} }
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
@ -1550,11 +1547,6 @@ fn cleanup_process(p: &mut Process) -> Result<()> {
let _ = unistd::close(p.term_master.unwrap())?; let _ = unistd::close(p.term_master.unwrap())?;
} }
if p.exit_pipe_r.is_some() {
p.close_stream(StreamType::ExitPipeR);
let _ = unistd::close(p.exit_pipe_r.unwrap())?;
}
p.notify_term_close(); p.notify_term_close();
p.parent_stdin = None; p.parent_stdin = None;

View File

@ -55,13 +55,6 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
} }
let mut p = process.unwrap(); let mut p = process.unwrap();
if p.exit_pipe_w.is_none() {
info!(logger, "process exit pipe not set");
continue;
}
let pipe_write = p.exit_pipe_w.unwrap();
let ret: i32; let ret: i32;
match wait_status { match wait_status {
@ -75,7 +68,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
} }
p.exit_code = ret; p.exit_code = ret;
let _ = unistd::close(pipe_write); let _ = p.exit_tx.take();
info!(logger, "notify term to close"); info!(logger, "notify term to close");
// close the socket file to notify readStdio to close terminal specifically // close the socket file to notify readStdio to close terminal specifically