mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-10-24 05:31:31 +00:00
agent: use channel instead of pipe to send exit signal of process
The situation is not a IPC scene, pipe(2) is too heavy. We have tokio::sync:⌚:channel after tokio has been introduced. The channel has better performance and easy to use. Fixes: #1721 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
@@ -908,7 +908,7 @@ impl BaseContainer for LinuxContainer {
|
|||||||
child = child.env(PIDNS_FD, format!("{}", pidns.unwrap()));
|
child = child.env(PIDNS_FD, format!("{}", pidns.unwrap()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let child = child.spawn()?;
|
child.spawn()?;
|
||||||
|
|
||||||
unistd::close(crfd)?;
|
unistd::close(crfd)?;
|
||||||
unistd::close(cwfd)?;
|
unistd::close(cwfd)?;
|
||||||
@@ -964,19 +964,6 @@ impl BaseContainer for LinuxContainer {
|
|||||||
|
|
||||||
self.created = SystemTime::now();
|
self.created = SystemTime::now();
|
||||||
|
|
||||||
// create the pipes for notify process exited
|
|
||||||
let (exit_pipe_r, exit_pipe_w) = unistd::pipe2(OFlag::O_CLOEXEC)
|
|
||||||
.context("failed to create pipe")
|
|
||||||
.map_err(|e| {
|
|
||||||
let _ = signal::kill(Pid::from_raw(child.id() as i32), Some(Signal::SIGKILL))
|
|
||||||
.map_err(|e| warn!(logger, "signal::kill creating pipe {:?}", e));
|
|
||||||
|
|
||||||
e
|
|
||||||
})?;
|
|
||||||
|
|
||||||
p.exit_pipe_w = Some(exit_pipe_w);
|
|
||||||
p.exit_pipe_r = Some(exit_pipe_r);
|
|
||||||
|
|
||||||
if p.init {
|
if p.init {
|
||||||
let spec = self.config.spec.as_mut().unwrap();
|
let spec = self.config.spec.as_mut().unwrap();
|
||||||
update_namespaces(&self.logger, spec, p.pid)?;
|
update_namespaces(&self.logger, spec, p.pid)?;
|
||||||
|
@@ -29,7 +29,6 @@ pub enum StreamType {
|
|||||||
Stdin,
|
Stdin,
|
||||||
Stdout,
|
Stdout,
|
||||||
Stderr,
|
Stderr,
|
||||||
ExitPipeR,
|
|
||||||
TermMaster,
|
TermMaster,
|
||||||
ParentStdin,
|
ParentStdin,
|
||||||
ParentStdout,
|
ParentStdout,
|
||||||
@@ -45,8 +44,8 @@ pub struct Process {
|
|||||||
pub stdin: Option<RawFd>,
|
pub stdin: Option<RawFd>,
|
||||||
pub stdout: Option<RawFd>,
|
pub stdout: Option<RawFd>,
|
||||||
pub stderr: Option<RawFd>,
|
pub stderr: Option<RawFd>,
|
||||||
pub exit_pipe_r: Option<RawFd>,
|
pub exit_tx: Option<tokio::sync::watch::Sender<bool>>,
|
||||||
pub exit_pipe_w: Option<RawFd>,
|
pub exit_rx: Option<tokio::sync::watch::Receiver<bool>>,
|
||||||
pub extra_files: Vec<File>,
|
pub extra_files: Vec<File>,
|
||||||
pub term_master: Option<RawFd>,
|
pub term_master: Option<RawFd>,
|
||||||
pub tty: bool,
|
pub tty: bool,
|
||||||
@@ -97,14 +96,15 @@ impl Process {
|
|||||||
pipe_size: i32,
|
pipe_size: i32,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let logger = logger.new(o!("subsystem" => "process"));
|
let logger = logger.new(o!("subsystem" => "process"));
|
||||||
|
let (exit_tx, exit_rx) = tokio::sync::watch::channel(false);
|
||||||
|
|
||||||
let mut p = Process {
|
let mut p = Process {
|
||||||
exec_id: String::from(id),
|
exec_id: String::from(id),
|
||||||
stdin: None,
|
stdin: None,
|
||||||
stdout: None,
|
stdout: None,
|
||||||
stderr: None,
|
stderr: None,
|
||||||
exit_pipe_w: None,
|
exit_tx: Some(exit_tx),
|
||||||
exit_pipe_r: None,
|
exit_rx: Some(exit_rx),
|
||||||
extra_files: Vec::new(),
|
extra_files: Vec::new(),
|
||||||
tty: ocip.terminal,
|
tty: ocip.terminal,
|
||||||
term_master: None,
|
term_master: None,
|
||||||
@@ -152,7 +152,6 @@ impl Process {
|
|||||||
StreamType::Stdin => self.stdin,
|
StreamType::Stdin => self.stdin,
|
||||||
StreamType::Stdout => self.stdout,
|
StreamType::Stdout => self.stdout,
|
||||||
StreamType::Stderr => self.stderr,
|
StreamType::Stderr => self.stderr,
|
||||||
StreamType::ExitPipeR => self.exit_pipe_r,
|
|
||||||
StreamType::TermMaster => self.term_master,
|
StreamType::TermMaster => self.term_master,
|
||||||
StreamType::ParentStdin => self.parent_stdin,
|
StreamType::ParentStdin => self.parent_stdin,
|
||||||
StreamType::ParentStdout => self.parent_stdout,
|
StreamType::ParentStdout => self.parent_stdout,
|
||||||
|
@@ -369,7 +369,6 @@ impl AgentService {
|
|||||||
let s = self.sandbox.clone();
|
let s = self.sandbox.clone();
|
||||||
let mut resp = WaitProcessResponse::new();
|
let mut resp = WaitProcessResponse::new();
|
||||||
let pid: pid_t;
|
let pid: pid_t;
|
||||||
let stream;
|
|
||||||
|
|
||||||
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
|
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
|
||||||
|
|
||||||
@@ -380,22 +379,20 @@ impl AgentService {
|
|||||||
"exec-id" => eid.clone()
|
"exec-id" => eid.clone()
|
||||||
);
|
);
|
||||||
|
|
||||||
{
|
let exit_rx = {
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
|
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
|
||||||
|
|
||||||
stream = p.get_reader(StreamType::ExitPipeR);
|
|
||||||
|
|
||||||
p.exit_watchers.push(exit_send);
|
p.exit_watchers.push(exit_send);
|
||||||
pid = p.pid;
|
pid = p.pid;
|
||||||
}
|
|
||||||
|
|
||||||
if stream.is_some() {
|
p.exit_rx.clone()
|
||||||
info!(sl!(), "reading exit pipe");
|
};
|
||||||
|
|
||||||
let reader = stream.unwrap();
|
if let Some(mut exit_rx) = exit_rx {
|
||||||
let mut content: Vec<u8> = vec![0, 1];
|
info!(sl!(), "cid {} eid {} waiting for exit signal", &cid, &eid);
|
||||||
let _ = reader.lock().await.read(&mut content).await;
|
while exit_rx.changed().await.is_ok() {}
|
||||||
|
info!(sl!(), "cid {} eid {} received exit signal", &cid, &eid);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
@@ -1550,11 +1547,6 @@ fn cleanup_process(p: &mut Process) -> Result<()> {
|
|||||||
let _ = unistd::close(p.term_master.unwrap())?;
|
let _ = unistd::close(p.term_master.unwrap())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.exit_pipe_r.is_some() {
|
|
||||||
p.close_stream(StreamType::ExitPipeR);
|
|
||||||
let _ = unistd::close(p.exit_pipe_r.unwrap())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
p.notify_term_close();
|
p.notify_term_close();
|
||||||
|
|
||||||
p.parent_stdin = None;
|
p.parent_stdin = None;
|
||||||
|
@@ -55,13 +55,6 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut p = process.unwrap();
|
let mut p = process.unwrap();
|
||||||
|
|
||||||
if p.exit_pipe_w.is_none() {
|
|
||||||
info!(logger, "process exit pipe not set");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let pipe_write = p.exit_pipe_w.unwrap();
|
|
||||||
let ret: i32;
|
let ret: i32;
|
||||||
|
|
||||||
match wait_status {
|
match wait_status {
|
||||||
@@ -75,7 +68,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
|
|||||||
}
|
}
|
||||||
|
|
||||||
p.exit_code = ret;
|
p.exit_code = ret;
|
||||||
let _ = unistd::close(pipe_write);
|
let _ = p.exit_tx.take();
|
||||||
|
|
||||||
info!(logger, "notify term to close");
|
info!(logger, "notify term to close");
|
||||||
// close the socket file to notify readStdio to close terminal specifically
|
// close the socket file to notify readStdio to close terminal specifically
|
||||||
|
Reference in New Issue
Block a user