mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
agent: use channel instead of pipe to send exit signal of process
The situation is not a IPC scene, pipe(2) is too heavy. We have tokio::sync:⌚:channel after tokio has been introduced. The channel has better performance and easy to use. Fixes: #1721 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
parent
0f2fe4a418
commit
8ecf8e5c1f
@ -908,7 +908,7 @@ impl BaseContainer for LinuxContainer {
|
||||
child = child.env(PIDNS_FD, format!("{}", pidns.unwrap()));
|
||||
}
|
||||
|
||||
let child = child.spawn()?;
|
||||
child.spawn()?;
|
||||
|
||||
unistd::close(crfd)?;
|
||||
unistd::close(cwfd)?;
|
||||
@ -964,19 +964,6 @@ impl BaseContainer for LinuxContainer {
|
||||
|
||||
self.created = SystemTime::now();
|
||||
|
||||
// create the pipes for notify process exited
|
||||
let (exit_pipe_r, exit_pipe_w) = unistd::pipe2(OFlag::O_CLOEXEC)
|
||||
.context("failed to create pipe")
|
||||
.map_err(|e| {
|
||||
let _ = signal::kill(Pid::from_raw(child.id() as i32), Some(Signal::SIGKILL))
|
||||
.map_err(|e| warn!(logger, "signal::kill creating pipe {:?}", e));
|
||||
|
||||
e
|
||||
})?;
|
||||
|
||||
p.exit_pipe_w = Some(exit_pipe_w);
|
||||
p.exit_pipe_r = Some(exit_pipe_r);
|
||||
|
||||
if p.init {
|
||||
let spec = self.config.spec.as_mut().unwrap();
|
||||
update_namespaces(&self.logger, spec, p.pid)?;
|
||||
|
@ -29,7 +29,6 @@ pub enum StreamType {
|
||||
Stdin,
|
||||
Stdout,
|
||||
Stderr,
|
||||
ExitPipeR,
|
||||
TermMaster,
|
||||
ParentStdin,
|
||||
ParentStdout,
|
||||
@ -45,8 +44,8 @@ pub struct Process {
|
||||
pub stdin: Option<RawFd>,
|
||||
pub stdout: Option<RawFd>,
|
||||
pub stderr: Option<RawFd>,
|
||||
pub exit_pipe_r: Option<RawFd>,
|
||||
pub exit_pipe_w: Option<RawFd>,
|
||||
pub exit_tx: Option<tokio::sync::watch::Sender<bool>>,
|
||||
pub exit_rx: Option<tokio::sync::watch::Receiver<bool>>,
|
||||
pub extra_files: Vec<File>,
|
||||
pub term_master: Option<RawFd>,
|
||||
pub tty: bool,
|
||||
@ -97,14 +96,15 @@ impl Process {
|
||||
pipe_size: i32,
|
||||
) -> Result<Self> {
|
||||
let logger = logger.new(o!("subsystem" => "process"));
|
||||
let (exit_tx, exit_rx) = tokio::sync::watch::channel(false);
|
||||
|
||||
let mut p = Process {
|
||||
exec_id: String::from(id),
|
||||
stdin: None,
|
||||
stdout: None,
|
||||
stderr: None,
|
||||
exit_pipe_w: None,
|
||||
exit_pipe_r: None,
|
||||
exit_tx: Some(exit_tx),
|
||||
exit_rx: Some(exit_rx),
|
||||
extra_files: Vec::new(),
|
||||
tty: ocip.terminal,
|
||||
term_master: None,
|
||||
@ -152,7 +152,6 @@ impl Process {
|
||||
StreamType::Stdin => self.stdin,
|
||||
StreamType::Stdout => self.stdout,
|
||||
StreamType::Stderr => self.stderr,
|
||||
StreamType::ExitPipeR => self.exit_pipe_r,
|
||||
StreamType::TermMaster => self.term_master,
|
||||
StreamType::ParentStdin => self.parent_stdin,
|
||||
StreamType::ParentStdout => self.parent_stdout,
|
||||
|
@ -369,7 +369,6 @@ impl AgentService {
|
||||
let s = self.sandbox.clone();
|
||||
let mut resp = WaitProcessResponse::new();
|
||||
let pid: pid_t;
|
||||
let stream;
|
||||
|
||||
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
@ -380,22 +379,20 @@ impl AgentService {
|
||||
"exec-id" => eid.clone()
|
||||
);
|
||||
|
||||
{
|
||||
let exit_rx = {
|
||||
let mut sandbox = s.lock().await;
|
||||
let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?;
|
||||
|
||||
stream = p.get_reader(StreamType::ExitPipeR);
|
||||
|
||||
p.exit_watchers.push(exit_send);
|
||||
pid = p.pid;
|
||||
}
|
||||
|
||||
if stream.is_some() {
|
||||
info!(sl!(), "reading exit pipe");
|
||||
p.exit_rx.clone()
|
||||
};
|
||||
|
||||
let reader = stream.unwrap();
|
||||
let mut content: Vec<u8> = vec![0, 1];
|
||||
let _ = reader.lock().await.read(&mut content).await;
|
||||
if let Some(mut exit_rx) = exit_rx {
|
||||
info!(sl!(), "cid {} eid {} waiting for exit signal", &cid, &eid);
|
||||
while exit_rx.changed().await.is_ok() {}
|
||||
info!(sl!(), "cid {} eid {} received exit signal", &cid, &eid);
|
||||
}
|
||||
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -1550,11 +1547,6 @@ fn cleanup_process(p: &mut Process) -> Result<()> {
|
||||
let _ = unistd::close(p.term_master.unwrap())?;
|
||||
}
|
||||
|
||||
if p.exit_pipe_r.is_some() {
|
||||
p.close_stream(StreamType::ExitPipeR);
|
||||
let _ = unistd::close(p.exit_pipe_r.unwrap())?;
|
||||
}
|
||||
|
||||
p.notify_term_close();
|
||||
|
||||
p.parent_stdin = None;
|
||||
|
@ -55,13 +55,6 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
|
||||
}
|
||||
|
||||
let mut p = process.unwrap();
|
||||
|
||||
if p.exit_pipe_w.is_none() {
|
||||
info!(logger, "process exit pipe not set");
|
||||
continue;
|
||||
}
|
||||
|
||||
let pipe_write = p.exit_pipe_w.unwrap();
|
||||
let ret: i32;
|
||||
|
||||
match wait_status {
|
||||
@ -75,7 +68,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
|
||||
}
|
||||
|
||||
p.exit_code = ret;
|
||||
let _ = unistd::close(pipe_write);
|
||||
let _ = p.exit_tx.take();
|
||||
|
||||
info!(logger, "notify term to close");
|
||||
// close the socket file to notify readStdio to close terminal specifically
|
||||
|
Loading…
Reference in New Issue
Block a user