agent: allow multiple wait on the same process

Until a container is deleted, agent should allow runtime to wait for
a process in parallel, as being supported by the go agent.

Signed-off-by: Peng Tao <bergwolf@hyper.sh>
This commit is contained in:
Peng Tao 2020-09-16 21:00:45 +08:00
parent 6487044fa1
commit 22876b2da6
2 changed files with 20 additions and 2 deletions

View File

@ -7,6 +7,7 @@
use libc::pid_t; use libc::pid_t;
use std::fs::File; use std::fs::File;
use std::os::unix::io::RawFd; use std::os::unix::io::RawFd;
use std::sync::mpsc::Sender;
// use crate::configs::{Capabilities, Rlimit}; // use crate::configs::{Capabilities, Rlimit};
// use crate::cgroups::Manager as CgroupManager; // use crate::cgroups::Manager as CgroupManager;
@ -45,6 +46,7 @@ pub struct Process {
pub pid: pid_t, pub pid: pid_t,
pub exit_code: i32, pub exit_code: i32,
pub exit_watchers: Vec<Sender<i32>>,
pub oci: OCIProcess, pub oci: OCIProcess,
pub logger: Logger, pub logger: Logger,
} }
@ -95,6 +97,7 @@ impl Process {
init, init,
pid: -1, pid: -1,
exit_code: 0, exit_code: 0,
exit_watchers: Vec::new(),
oci: ocip.clone(), oci: ocip.clone(),
logger: logger.clone(), logger: logger.clone(),
}; };

View File

@ -4,6 +4,7 @@
// //
use std::path::Path; use std::path::Path;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use ttrpc; use ttrpc;
@ -365,6 +366,7 @@ impl agentService {
let pid: pid_t; let pid: pid_t;
let mut exit_pipe_r: RawFd = -1; let mut exit_pipe_r: RawFd = -1;
let mut buf: Vec<u8> = vec![0, 1]; let mut buf: Vec<u8> = vec![0, 1];
let (exit_send, exit_recv) = channel();
info!( info!(
sl!(), sl!(),
@ -382,6 +384,7 @@ impl agentService {
exit_pipe_r = p.exit_pipe_r.unwrap(); exit_pipe_r = p.exit_pipe_r.unwrap();
} }
p.exit_watchers.push(exit_send);
pid = p.pid; pid = p.pid;
} }
@ -398,9 +401,16 @@ impl agentService {
} }
}; };
// need to close all fds let mut p = match ctr.processes.get_mut(&pid) {
let mut p = ctr.processes.get_mut(&pid).unwrap(); Some(p) => p,
None => {
// Lost race, pick up exit code from channel
resp.status = exit_recv.recv().unwrap();
return Ok(resp);
}
};
// need to close all fds
if p.parent_stdin.is_some() { if p.parent_stdin.is_some() {
let _ = unistd::close(p.parent_stdin.unwrap()); let _ = unistd::close(p.parent_stdin.unwrap());
} }
@ -427,6 +437,11 @@ impl agentService {
p.term_master = None; p.term_master = None;
resp.status = p.exit_code; resp.status = p.exit_code;
// broadcast exit code to all parallel watchers
for s in p.exit_watchers.iter() {
// Just ignore errors in case any watcher quits unexpectedly
let _ = s.send(p.exit_code);
}
ctr.processes.remove(&pid); ctr.processes.remove(&pid);