mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-10-25 14:23:11 +00:00 
			
		
		
		
	agent: refactor process IO processing
Move closing IO into process.rs and use macro to reduce codes. Fixes: #2944 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
		| @@ -24,6 +24,16 @@ use tokio::io::{split, ReadHalf, WriteHalf}; | |||||||
| use tokio::sync::Mutex; | use tokio::sync::Mutex; | ||||||
| use tokio::sync::Notify; | use tokio::sync::Notify; | ||||||
|  |  | ||||||
|  | macro_rules! close_process_stream { | ||||||
|  |     ($self: ident, $stream:ident, $stream_type: ident) => { | ||||||
|  |         if $self.$stream.is_some() { | ||||||
|  |             $self.close_stream(StreamType::$stream_type); | ||||||
|  |             let _ = unistd::close($self.$stream.unwrap()); | ||||||
|  |             $self.$stream = None; | ||||||
|  |         } | ||||||
|  |     }; | ||||||
|  | } | ||||||
|  |  | ||||||
| #[derive(Debug, PartialEq, Eq, Hash, Clone)] | #[derive(Debug, PartialEq, Eq, Hash, Clone)] | ||||||
| pub enum StreamType { | pub enum StreamType { | ||||||
|     Stdin, |     Stdin, | ||||||
| @@ -147,6 +157,22 @@ impl Process { | |||||||
|         notify.notify_one(); |         notify.notify_one(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn close_stdin(&mut self) { | ||||||
|  |         close_process_stream!(self, term_master, TermMaster); | ||||||
|  |         close_process_stream!(self, parent_stdin, ParentStdin); | ||||||
|  |  | ||||||
|  |         self.notify_term_close(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn cleanup_process_stream(&mut self) { | ||||||
|  |         close_process_stream!(self, parent_stdin, ParentStdin); | ||||||
|  |         close_process_stream!(self, parent_stdout, ParentStdout); | ||||||
|  |         close_process_stream!(self, parent_stderr, ParentStderr); | ||||||
|  |         close_process_stream!(self, term_master, TermMaster); | ||||||
|  |  | ||||||
|  |         self.notify_term_close(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn get_fd(&self, stream_type: &StreamType) -> Option<RawFd> { |     fn get_fd(&self, stream_type: &StreamType) -> Option<RawFd> { | ||||||
|         match stream_type { |         match stream_type { | ||||||
|             StreamType::Stdin => self.stdin, |             StreamType::Stdin => self.stdin, | ||||||
|   | |||||||
| @@ -433,7 +433,7 @@ impl AgentService { | |||||||
|             .get_container(&cid) |             .get_container(&cid) | ||||||
|             .ok_or_else(|| anyhow!("Invalid container id"))?; |             .ok_or_else(|| anyhow!("Invalid container id"))?; | ||||||
|  |  | ||||||
|         let mut p = match ctr.processes.get_mut(&pid) { |         let p = match ctr.processes.get_mut(&pid) { | ||||||
|             Some(p) => p, |             Some(p) => p, | ||||||
|             None => { |             None => { | ||||||
|                 // Lost race, pick up exit code from channel |                 // Lost race, pick up exit code from channel | ||||||
| @@ -444,7 +444,7 @@ impl AgentService { | |||||||
|  |  | ||||||
|         // need to close all fd |         // need to close all fd | ||||||
|         // ignore errors for some fd might be closed by stream |         // ignore errors for some fd might be closed by stream | ||||||
|         let _ = cleanup_process(&mut p); |         p.cleanup_process_stream(); | ||||||
|  |  | ||||||
|         resp.status = p.exit_code; |         resp.status = p.exit_code; | ||||||
|         // broadcast exit code to all parallel watchers |         // broadcast exit code to all parallel watchers | ||||||
| @@ -776,19 +776,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { | |||||||
|             ) |             ) | ||||||
|         })?; |         })?; | ||||||
|  |  | ||||||
|         if p.term_master.is_some() { |         p.close_stdin(); | ||||||
|             p.close_stream(StreamType::TermMaster); |  | ||||||
|             let _ = unistd::close(p.term_master.unwrap()); |  | ||||||
|             p.term_master = None; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if p.parent_stdin.is_some() { |  | ||||||
|             p.close_stream(StreamType::ParentStdin); |  | ||||||
|             let _ = unistd::close(p.parent_stdin.unwrap()); |  | ||||||
|             p.parent_stdin = None; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         p.notify_term_close(); |  | ||||||
|  |  | ||||||
|         Ok(Empty::new()) |         Ok(Empty::new()) | ||||||
|     } |     } | ||||||
| @@ -1661,11 +1649,6 @@ fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> { | |||||||
|         readonly: spec_root.readonly, |         readonly: spec_root.readonly, | ||||||
|     }); |     }); | ||||||
|  |  | ||||||
|     info!( |  | ||||||
|         sl!(), |  | ||||||
|         "{:?}", |  | ||||||
|         spec.process.as_ref().unwrap().console_size.as_ref() |  | ||||||
|     ); |  | ||||||
|     let _ = spec.save(config_path.to_str().unwrap()); |     let _ = spec.save(config_path.to_str().unwrap()); | ||||||
|  |  | ||||||
|     let olddir = unistd::getcwd().context("cannot getcwd")?; |     let olddir = unistd::getcwd().context("cannot getcwd")?; | ||||||
| @@ -1674,37 +1657,6 @@ fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> { | |||||||
|     Ok(olddir) |     Ok(olddir) | ||||||
| } | } | ||||||
|  |  | ||||||
| fn cleanup_process(p: &mut Process) -> Result<()> { |  | ||||||
|     if p.parent_stdin.is_some() { |  | ||||||
|         p.close_stream(StreamType::ParentStdin); |  | ||||||
|         unistd::close(p.parent_stdin.unwrap())?; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     if p.parent_stdout.is_some() { |  | ||||||
|         p.close_stream(StreamType::ParentStdout); |  | ||||||
|         unistd::close(p.parent_stdout.unwrap())?; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     if p.parent_stderr.is_some() { |  | ||||||
|         p.close_stream(StreamType::ParentStderr); |  | ||||||
|         unistd::close(p.parent_stderr.unwrap())?; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     if p.term_master.is_some() { |  | ||||||
|         p.close_stream(StreamType::TermMaster); |  | ||||||
|         unistd::close(p.term_master.unwrap())?; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     p.notify_term_close(); |  | ||||||
|  |  | ||||||
|     p.parent_stdin = None; |  | ||||||
|     p.parent_stdout = None; |  | ||||||
|     p.parent_stderr = None; |  | ||||||
|     p.term_master = None; |  | ||||||
|  |  | ||||||
|     Ok(()) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> { | fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> { | ||||||
|     if module.name.is_empty() { |     if module.name.is_empty() { | ||||||
|         return Err(anyhow!("Kernel module name is empty")); |         return Err(anyhow!("Kernel module name is empty")); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user