agent: refactor process IO processing

Move closing IO into process.rs and use macro
to reduce codes.

Fixes: #2944

Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
bin 2021-11-02 15:49:11 +08:00
parent 1c81d7e0b6
commit 1e331f7542
2 changed files with 29 additions and 51 deletions

View File

@ -24,6 +24,16 @@ use tokio::io::{split, ReadHalf, WriteHalf};
use tokio::sync::Mutex;
use tokio::sync::Notify;
macro_rules! close_process_stream {
($self: ident, $stream:ident, $stream_type: ident) => {
if $self.$stream.is_some() {
$self.close_stream(StreamType::$stream_type);
let _ = unistd::close($self.$stream.unwrap());
$self.$stream = None;
}
};
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum StreamType {
Stdin,
@ -147,6 +157,22 @@ impl Process {
notify.notify_one();
}
pub fn close_stdin(&mut self) {
close_process_stream!(self, term_master, TermMaster);
close_process_stream!(self, parent_stdin, ParentStdin);
self.notify_term_close();
}
pub fn cleanup_process_stream(&mut self) {
close_process_stream!(self, parent_stdin, ParentStdin);
close_process_stream!(self, parent_stdout, ParentStdout);
close_process_stream!(self, parent_stderr, ParentStderr);
close_process_stream!(self, term_master, TermMaster);
self.notify_term_close();
}
fn get_fd(&self, stream_type: &StreamType) -> Option<RawFd> {
match stream_type {
StreamType::Stdin => self.stdin,

View File

@ -433,7 +433,7 @@ impl AgentService {
.get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?;
let mut p = match ctr.processes.get_mut(&pid) {
let p = match ctr.processes.get_mut(&pid) {
Some(p) => p,
None => {
// Lost race, pick up exit code from channel
@ -444,7 +444,7 @@ impl AgentService {
// need to close all fd
// ignore errors for some fd might be closed by stream
let _ = cleanup_process(&mut p);
p.cleanup_process_stream();
resp.status = p.exit_code;
// broadcast exit code to all parallel watchers
@ -776,19 +776,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
)
})?;
if p.term_master.is_some() {
p.close_stream(StreamType::TermMaster);
let _ = unistd::close(p.term_master.unwrap());
p.term_master = None;
}
if p.parent_stdin.is_some() {
p.close_stream(StreamType::ParentStdin);
let _ = unistd::close(p.parent_stdin.unwrap());
p.parent_stdin = None;
}
p.notify_term_close();
p.close_stdin();
Ok(Empty::new())
}
@ -1661,11 +1649,6 @@ fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> {
readonly: spec_root.readonly,
});
info!(
sl!(),
"{:?}",
spec.process.as_ref().unwrap().console_size.as_ref()
);
let _ = spec.save(config_path.to_str().unwrap());
let olddir = unistd::getcwd().context("cannot getcwd")?;
@ -1674,37 +1657,6 @@ fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> {
Ok(olddir)
}
fn cleanup_process(p: &mut Process) -> Result<()> {
if p.parent_stdin.is_some() {
p.close_stream(StreamType::ParentStdin);
unistd::close(p.parent_stdin.unwrap())?;
}
if p.parent_stdout.is_some() {
p.close_stream(StreamType::ParentStdout);
unistd::close(p.parent_stdout.unwrap())?;
}
if p.parent_stderr.is_some() {
p.close_stream(StreamType::ParentStderr);
unistd::close(p.parent_stderr.unwrap())?;
}
if p.term_master.is_some() {
p.close_stream(StreamType::TermMaster);
unistd::close(p.term_master.unwrap())?;
}
p.notify_term_close();
p.parent_stdin = None;
p.parent_stdout = None;
p.parent_stderr = None;
p.term_master = None;
Ok(())
}
fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> {
if module.name.is_empty() {
return Err(anyhow!("Kernel module name is empty"));