agent: use channel instead of pipe to send exit signal of process

The situation is not a IPC scene, pipe(2) is too heavy.

We have tokio::sync::channel after tokio has been introduced.
The channel has better performance and easy to use.

Fixes: #1721

Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
Tim Zhang 2021-04-21 16:39:13 +08:00
parent 0f2fe4a418
commit 8ecf8e5c1f
4 changed files with 14 additions and 43 deletions

View File

@ -908,7 +908,7 @@ impl BaseContainer for LinuxContainer {
child = child.env(PIDNS_FD, format!("{}", pidns.unwrap()));
}
let child = child.spawn()?;
child.spawn()?;
unistd::close(crfd)?;
unistd::close(cwfd)?;
@ -964,19 +964,6 @@ impl BaseContainer for LinuxContainer {
self.created = SystemTime::now();
// create the pipes for notify process exited
let (exit_pipe_r, exit_pipe_w) = unistd::pipe2(OFlag::O_CLOEXEC)
.context("failed to create pipe")
.map_err(|e| {
let _ = signal::kill(Pid::from_raw(child.id() as i32), Some(Signal::SIGKILL))
.map_err(|e| warn!(logger, "signal::kill creating pipe {:?}", e));
e
})?;
p.exit_pipe_w = Some(exit_pipe_w);
p.exit_pipe_r = Some(exit_pipe_r);
if p.init {
let spec = self.config.spec.as_mut().unwrap();
update_namespaces(&self.logger, spec, p.pid)?;

View File

@ -29,7 +29,6 @@ pub enum StreamType {
Stdin,
Stdout,
Stderr,
ExitPipeR,
TermMaster,
ParentStdin,
ParentStdout,
@ -45,8 +44,8 @@ pub struct Process {
pub stdin: Option<RawFd>,
pub stdout: Option<RawFd>,
pub stderr: Option<RawFd>,
pub exit_pipe_r: Option<RawFd>,
pub exit_pipe_w: Option<RawFd>,
pub exit_tx: Option<tokio::sync::watch::Sender<bool>>,
pub exit_rx: Option<tokio::sync::watch::Receiver<bool>>,
pub extra_files: Vec<File>,
pub term_master: Option<RawFd>,
pub tty: bool,
@ -97,14 +96,15 @@ impl Process {
pipe_size: i32,
) -> Result<Self> {
let logger = logger.new(o!("subsystem" => "process"));
let (exit_tx, exit_rx) = tokio::sync::watch::channel(false);
let mut p = Process {
exec_id: String::from(id),
stdin: None,
stdout: None,
stderr: None,
exit_pipe_w: None,
exit_pipe_r: None,
exit_tx: Some(exit_tx),
exit_rx: Some(exit_rx),
extra_files: Vec::new(),
tty: ocip.terminal,
term_master: None,
@ -152,7 +152,6 @@ impl Process {
StreamType::Stdin => self.stdin,
StreamType::Stdout => self.stdout,
StreamType::Stderr => self.stderr,
StreamType::ExitPipeR => self.exit_pipe_r,
StreamType::TermMaster => self.term_master,
StreamType::ParentStdin => self.parent_stdin,
StreamType::ParentStdout => self.parent_stdout,

View File

@ -369,7 +369,6 @@ impl AgentService {
let s = self.sandbox.clone();
let mut resp = WaitProcessResponse::new();
let pid: pid_t;
let stream;
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
@ -380,22 +379,20 @@ impl AgentService {
"exec-id" => eid.clone()
);
{
let exit_rx = {
let mut sandbox = s.lock().await;
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
stream = p.get_reader(StreamType::ExitPipeR);
p.exit_watchers.push(exit_send);
pid = p.pid;
}
if stream.is_some() {
info!(sl!(), "reading exit pipe");
p.exit_rx.clone()
};
let reader = stream.unwrap();
let mut content: Vec<u8> = vec![0, 1];
let _ = reader.lock().await.read(&mut content).await;
if let Some(mut exit_rx) = exit_rx {
info!(sl!(), "cid {} eid {} waiting for exit signal", &cid, &eid);
while exit_rx.changed().await.is_ok() {}
info!(sl!(), "cid {} eid {} received exit signal", &cid, &eid);
}
let mut sandbox = s.lock().await;
@ -1550,11 +1547,6 @@ fn cleanup_process(p: &mut Process) -> Result<()> {
let _ = unistd::close(p.term_master.unwrap())?;
}
if p.exit_pipe_r.is_some() {
p.close_stream(StreamType::ExitPipeR);
let _ = unistd::close(p.exit_pipe_r.unwrap())?;
}
p.notify_term_close();
p.parent_stdin = None;

View File

@ -55,13 +55,6 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
}
let mut p = process.unwrap();
if p.exit_pipe_w.is_none() {
info!(logger, "process exit pipe not set");
continue;
}
let pipe_write = p.exit_pipe_w.unwrap();
let ret: i32;
match wait_status {
@ -75,7 +68,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
}
p.exit_code = ret;
let _ = unistd::close(pipe_write);
let _ = p.exit_tx.take();
info!(logger, "notify term to close");
// close the socket file to notify readStdio to close terminal specifically