mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-05-01 13:14:33 +00:00
agent: refactor process IO processing
Move closing IO into process.rs and use macro to reduce codes. Fixes: #2944 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
1c81d7e0b6
commit
1e331f7542
@ -24,6 +24,16 @@ use tokio::io::{split, ReadHalf, WriteHalf};
|
|||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
|
macro_rules! close_process_stream {
|
||||||
|
($self: ident, $stream:ident, $stream_type: ident) => {
|
||||||
|
if $self.$stream.is_some() {
|
||||||
|
$self.close_stream(StreamType::$stream_type);
|
||||||
|
let _ = unistd::close($self.$stream.unwrap());
|
||||||
|
$self.$stream = None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||||
pub enum StreamType {
|
pub enum StreamType {
|
||||||
Stdin,
|
Stdin,
|
||||||
@ -147,6 +157,22 @@ impl Process {
|
|||||||
notify.notify_one();
|
notify.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn close_stdin(&mut self) {
|
||||||
|
close_process_stream!(self, term_master, TermMaster);
|
||||||
|
close_process_stream!(self, parent_stdin, ParentStdin);
|
||||||
|
|
||||||
|
self.notify_term_close();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cleanup_process_stream(&mut self) {
|
||||||
|
close_process_stream!(self, parent_stdin, ParentStdin);
|
||||||
|
close_process_stream!(self, parent_stdout, ParentStdout);
|
||||||
|
close_process_stream!(self, parent_stderr, ParentStderr);
|
||||||
|
close_process_stream!(self, term_master, TermMaster);
|
||||||
|
|
||||||
|
self.notify_term_close();
|
||||||
|
}
|
||||||
|
|
||||||
fn get_fd(&self, stream_type: &StreamType) -> Option<RawFd> {
|
fn get_fd(&self, stream_type: &StreamType) -> Option<RawFd> {
|
||||||
match stream_type {
|
match stream_type {
|
||||||
StreamType::Stdin => self.stdin,
|
StreamType::Stdin => self.stdin,
|
||||||
|
@ -433,7 +433,7 @@ impl AgentService {
|
|||||||
.get_container(&cid)
|
.get_container(&cid)
|
||||||
.ok_or_else(|| anyhow!("Invalid container id"))?;
|
.ok_or_else(|| anyhow!("Invalid container id"))?;
|
||||||
|
|
||||||
let mut p = match ctr.processes.get_mut(&pid) {
|
let p = match ctr.processes.get_mut(&pid) {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => {
|
None => {
|
||||||
// Lost race, pick up exit code from channel
|
// Lost race, pick up exit code from channel
|
||||||
@ -444,7 +444,7 @@ impl AgentService {
|
|||||||
|
|
||||||
// need to close all fd
|
// need to close all fd
|
||||||
// ignore errors for some fd might be closed by stream
|
// ignore errors for some fd might be closed by stream
|
||||||
let _ = cleanup_process(&mut p);
|
p.cleanup_process_stream();
|
||||||
|
|
||||||
resp.status = p.exit_code;
|
resp.status = p.exit_code;
|
||||||
// broadcast exit code to all parallel watchers
|
// broadcast exit code to all parallel watchers
|
||||||
@ -776,19 +776,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if p.term_master.is_some() {
|
p.close_stdin();
|
||||||
p.close_stream(StreamType::TermMaster);
|
|
||||||
let _ = unistd::close(p.term_master.unwrap());
|
|
||||||
p.term_master = None;
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.parent_stdin.is_some() {
|
|
||||||
p.close_stream(StreamType::ParentStdin);
|
|
||||||
let _ = unistd::close(p.parent_stdin.unwrap());
|
|
||||||
p.parent_stdin = None;
|
|
||||||
}
|
|
||||||
|
|
||||||
p.notify_term_close();
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1661,11 +1649,6 @@ fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> {
|
|||||||
readonly: spec_root.readonly,
|
readonly: spec_root.readonly,
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
|
||||||
sl!(),
|
|
||||||
"{:?}",
|
|
||||||
spec.process.as_ref().unwrap().console_size.as_ref()
|
|
||||||
);
|
|
||||||
let _ = spec.save(config_path.to_str().unwrap());
|
let _ = spec.save(config_path.to_str().unwrap());
|
||||||
|
|
||||||
let olddir = unistd::getcwd().context("cannot getcwd")?;
|
let olddir = unistd::getcwd().context("cannot getcwd")?;
|
||||||
@ -1674,37 +1657,6 @@ fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> {
|
|||||||
Ok(olddir)
|
Ok(olddir)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cleanup_process(p: &mut Process) -> Result<()> {
|
|
||||||
if p.parent_stdin.is_some() {
|
|
||||||
p.close_stream(StreamType::ParentStdin);
|
|
||||||
unistd::close(p.parent_stdin.unwrap())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.parent_stdout.is_some() {
|
|
||||||
p.close_stream(StreamType::ParentStdout);
|
|
||||||
unistd::close(p.parent_stdout.unwrap())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.parent_stderr.is_some() {
|
|
||||||
p.close_stream(StreamType::ParentStderr);
|
|
||||||
unistd::close(p.parent_stderr.unwrap())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.term_master.is_some() {
|
|
||||||
p.close_stream(StreamType::TermMaster);
|
|
||||||
unistd::close(p.term_master.unwrap())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
p.notify_term_close();
|
|
||||||
|
|
||||||
p.parent_stdin = None;
|
|
||||||
p.parent_stdout = None;
|
|
||||||
p.parent_stderr = None;
|
|
||||||
p.term_master = None;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> {
|
fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> {
|
||||||
if module.name.is_empty() {
|
if module.name.is_empty() {
|
||||||
return Err(anyhow!("Kernel module name is empty"));
|
return Err(anyhow!("Kernel module name is empty"));
|
||||||
|
Loading…
Reference in New Issue
Block a user