mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-10-31 17:37:20 +00:00 
			
		
		
		
	agent: exit from exec hangs if background process is present
This is the Rust porting of https://github.com/kata-containers/agent/pull/371 `read_stdout`/`read_stderr` is blocking rpc calls, if exec process exited, these calls is on blocking state for reading on process's term master fd, and can't get a chance to break the wait. In this PR, `read_stdout`/`read_stderr` will not read directly from a term master of a process, instead, it will first have to get an fd to read from newly added `epoller.poll()`. `epoller.poll()` may returns: - the term master fd of exec process, if the process is running. - a fd(piped fd) will return EOF when reading to indicate that th process is exited. Fixes: #1160 Signed-off-by: bin liu <bin@hyper.sh>
This commit is contained in:
		
							
								
								
									
										11
									
								
								src/agent/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										11
									
								
								src/agent/Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -248,6 +248,16 @@ version = "1.6.1" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" | ||||
|  | ||||
| [[package]] | ||||
| name = "epoll" | ||||
| version = "4.3.1" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "20df693c700404f7e19d4d6fae6b15215d2913c27955d2b9d6f2c0f537511cd0" | ||||
| dependencies = [ | ||||
|  "bitflags", | ||||
|  "libc", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
| name = "errno" | ||||
| version = "0.2.6" | ||||
| @@ -921,6 +931,7 @@ dependencies = [ | ||||
|  "caps", | ||||
|  "cgroups", | ||||
|  "dirs", | ||||
|  "epoll", | ||||
|  "lazy_static", | ||||
|  "libc", | ||||
|  "nix 0.17.0", | ||||
|   | ||||
| @@ -26,6 +26,7 @@ dirs = "3.0.1" | ||||
| anyhow = "1.0.32" | ||||
| cgroups = { git = "https://github.com/kata-containers/cgroups-rs", branch = "stable-0.1.1"} | ||||
| tempfile = "3.1.0" | ||||
| epoll = "4.3.1" | ||||
|  | ||||
| [dev-dependencies] | ||||
| serial_test = "0.5.0" | ||||
|   | ||||
| @@ -41,6 +41,7 @@ pub mod cgroups; | ||||
| pub mod container; | ||||
| pub mod mount; | ||||
| pub mod process; | ||||
| pub mod reaper; | ||||
| pub mod specconv; | ||||
| pub mod sync; | ||||
| pub mod validator; | ||||
|   | ||||
| @@ -14,6 +14,7 @@ use nix::sys::wait::{self, WaitStatus}; | ||||
| use nix::unistd::{self, Pid}; | ||||
| use nix::Result; | ||||
|  | ||||
| use crate::reaper::Epoller; | ||||
| use oci::Process as OCIProcess; | ||||
| use slog::Logger; | ||||
|  | ||||
| @@ -40,6 +41,7 @@ pub struct Process { | ||||
|     pub exit_watchers: Vec<Sender<i32>>, | ||||
|     pub oci: OCIProcess, | ||||
|     pub logger: Logger, | ||||
|     pub epoller: Option<Epoller>, | ||||
| } | ||||
|  | ||||
| pub trait ProcessOperations { | ||||
| @@ -91,6 +93,7 @@ impl Process { | ||||
|             exit_watchers: Vec::new(), | ||||
|             oci: ocip.clone(), | ||||
|             logger: logger.clone(), | ||||
|             epoller: None, | ||||
|         }; | ||||
|  | ||||
|         info!(logger, "before create console socket!"); | ||||
| @@ -112,6 +115,29 @@ impl Process { | ||||
|         } | ||||
|         Ok(p) | ||||
|     } | ||||
|  | ||||
|     pub fn close_epoller(&mut self) { | ||||
|         if let Some(epoller) = self.epoller.take() { | ||||
|             epoller.close(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn create_epoller(&mut self) -> anyhow::Result<()> { | ||||
|         match self.term_master { | ||||
|             Some(term_master) => { | ||||
|                 // add epoller to process | ||||
|                 let epoller = Epoller::new(&self.logger, term_master)?; | ||||
|                 self.epoller = Some(epoller) | ||||
|             } | ||||
|             None => { | ||||
|                 info!( | ||||
|                     self.logger, | ||||
|                     "try to add epoller to a process without a term master fd" | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn create_extended_pipe(flags: OFlag, pipe_size: i32) -> Result<(RawFd, RawFd)> { | ||||
|   | ||||
							
								
								
									
										150
									
								
								src/agent/rustjail/src/reaper.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										150
									
								
								src/agent/rustjail/src/reaper.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,150 @@ | ||||
| // Copyright (c) 2020 Ant Group | ||||
| // | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
| // | ||||
|  | ||||
| use nix::fcntl::OFlag; | ||||
| use slog::Logger; | ||||
|  | ||||
| use nix::unistd; | ||||
| use std::os::unix::io::RawFd; | ||||
|  | ||||
| use anyhow::Result; | ||||
|  | ||||
| const MAX_EVENTS: usize = 2; | ||||
|  | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct Epoller { | ||||
|     logger: Logger, | ||||
|     epoll_fd: RawFd, | ||||
|     // rfd and wfd are a pipe's files two ends, this pipe is | ||||
|     // used to sync between the readStdio and the process exits. | ||||
|     // once the process exits, it will close one end to notify | ||||
|     // the readStdio that the process has exited and it should not | ||||
|     // wait on the process's terminal which has been inherited | ||||
|     // by it's children and hasn't exited. | ||||
|     rfd: RawFd, | ||||
|     wfd: RawFd, | ||||
| } | ||||
|  | ||||
| impl Epoller { | ||||
|     pub fn new(logger: &Logger, fd: RawFd) -> Result<Epoller> { | ||||
|         let epoll_fd = epoll::create(true)?; | ||||
|         let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?; | ||||
|  | ||||
|         let mut epoller = Self { | ||||
|             logger: logger.clone(), | ||||
|             epoll_fd, | ||||
|             rfd, | ||||
|             wfd, | ||||
|         }; | ||||
|  | ||||
|         epoller.add(rfd)?; | ||||
|         epoller.add(fd)?; | ||||
|  | ||||
|         Ok(epoller) | ||||
|     } | ||||
|  | ||||
|     pub fn close_wfd(&self) { | ||||
|         let _ = unistd::close(self.wfd); | ||||
|     } | ||||
|  | ||||
|     pub fn close(&self) { | ||||
|         let _ = unistd::close(self.rfd); | ||||
|         let _ = unistd::close(self.wfd); | ||||
|         let _ = unistd::close(self.epoll_fd); | ||||
|     } | ||||
|  | ||||
|     fn add(&mut self, fd: RawFd) -> Result<()> { | ||||
|         info!(self.logger, "Epoller add fd {}", fd); | ||||
|         // add creates an epoll which is used to monitor the process's pty's master and | ||||
|         // one end of its exit notify pipe. Those files will be registered with level-triggered | ||||
|         // notification. | ||||
|         epoll::ctl( | ||||
|             self.epoll_fd, | ||||
|             epoll::ControlOptions::EPOLL_CTL_ADD, | ||||
|             fd, | ||||
|             epoll::Event::new( | ||||
|                 epoll::Events::EPOLLHUP | ||||
|                     | epoll::Events::EPOLLIN | ||||
|                     | epoll::Events::EPOLLERR | ||||
|                     | epoll::Events::EPOLLRDHUP, | ||||
|                 fd as u64, | ||||
|             ), | ||||
|         )?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // There will be three cases on the epoller once it poll: | ||||
|     // a: only pty's master get an event(other than self.rfd); | ||||
|     // b: only the pipe get an event(self.rfd); | ||||
|     // c: both of pty and pipe have event occur; | ||||
|     // for case a, it means there is output in process's terminal and what needed to do is | ||||
|     // just read the terminal and send them out; for case b, it means the process has exited | ||||
|     // and there is no data in the terminal, thus just return the "EOF" to end the io; | ||||
|     // for case c, it means the process has exited but there is some data in the terminal which | ||||
|     // hasn't been send out, thus it should send those data out first and then send "EOF" last to | ||||
|     // end the io. | ||||
|     pub fn poll(&self) -> Result<RawFd> { | ||||
|         let mut rfd = self.rfd; | ||||
|         let mut epoll_events = vec![epoll::Event::new(epoll::Events::empty(), 0); MAX_EVENTS]; | ||||
|  | ||||
|         loop { | ||||
|             let event_count = match epoll::wait(self.epoll_fd, -1, epoll_events.as_mut_slice()) { | ||||
|                 Ok(ec) => ec, | ||||
|                 Err(e) => { | ||||
|                     info!(self.logger, "loop wait err {:?}", e); | ||||
|                     // EINTR: The call was interrupted by a signal handler before either | ||||
|                     // any of the requested events occurred or the timeout expired | ||||
|                     if e.kind() == std::io::ErrorKind::Interrupted { | ||||
|                         continue; | ||||
|                     } | ||||
|                     return Err(e.into()); | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             for event in epoll_events.iter().take(event_count) { | ||||
|                 let fd = event.data as i32; | ||||
|                 // fd has been assigned with one end of process's exited pipe by default, and | ||||
|                 // here to check is there any event occur on process's terminal, if "yes", it | ||||
|                 // should be dealt first, otherwise, it means the process has exited and there | ||||
|                 // is nothing left in the process's terminal needed to be read. | ||||
|                 if fd != rfd { | ||||
|                     rfd = fd; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|             break; | ||||
|         } | ||||
|  | ||||
|         Ok(rfd) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::Epoller; | ||||
|     use nix::fcntl::OFlag; | ||||
|     use nix::unistd; | ||||
|     use std::thread; | ||||
|  | ||||
|     #[test] | ||||
|     fn test_epoller_poll() { | ||||
|         let logger = slog::Logger::root(slog::Discard, o!()); | ||||
|         let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap(); | ||||
|         let epoller = Epoller::new(&logger, rfd).unwrap(); | ||||
|  | ||||
|         let child = thread::spawn(move || { | ||||
|             let _ = unistd::write(wfd, "temporary file's content".as_bytes()); | ||||
|         }); | ||||
|  | ||||
|         // wait write to finish | ||||
|         let _ = child.join(); | ||||
|  | ||||
|         let fd = epoller.poll().unwrap(); | ||||
|         assert_eq!(fd, rfd, "Should get rfd"); | ||||
|  | ||||
|         epoller.close(); | ||||
|     } | ||||
| } | ||||
| @@ -306,6 +306,7 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result | ||||
|                         continue 'outer; | ||||
|                     } | ||||
|                 }; | ||||
|                 info!(logger, "wait_status"; "wait_status result" => format!("{:?}", wait_status)); | ||||
|  | ||||
|                 let pid = wait_status.pid(); | ||||
|                 if let Some(pid) = pid { | ||||
| @@ -342,6 +343,13 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result | ||||
|  | ||||
|                     p.exit_code = ret; | ||||
|                     let _ = unistd::close(pipe_write); | ||||
|  | ||||
|                     if let Some(ref poller) = p.epoller { | ||||
|                         info!(logger, "close epoller"); | ||||
|                         // close the socket file to notify readStdio to close terminal specifically | ||||
|                         // in case this process's terminal has been inherited by its children. | ||||
|                         poller.close_wfd() | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|   | ||||
| @@ -24,6 +24,7 @@ use protocols::types::Interface; | ||||
| use rustjail::cgroups::notifier; | ||||
| use rustjail::container::{BaseContainer, Container, LinuxContainer}; | ||||
| use rustjail::process::Process; | ||||
| use rustjail::reaper; | ||||
| use rustjail::specconv::CreateOpts; | ||||
|  | ||||
| use nix::errno::Errno; | ||||
| @@ -189,10 +190,14 @@ impl agentService { | ||||
|             let cg_path = ctr.cgroup_manager.as_ref().unwrap().get_cg_path("memory"); | ||||
|             if cg_path.is_some() { | ||||
|                 let rx = notifier::notify_oom(cid.as_str(), cg_path.unwrap())?; | ||||
|                 s.run_oom_event_monitor(rx, cid); | ||||
|                 s.run_oom_event_monitor(rx, cid.clone()); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // set epoller | ||||
|         let p = find_process(&mut s, cid.as_str(), "", true)?; | ||||
|         p.create_epoller()?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
| @@ -272,7 +277,7 @@ impl agentService { | ||||
|         let cid = req.container_id.clone(); | ||||
|         let exec_id = req.exec_id.clone(); | ||||
|  | ||||
|         info!(sl!(), "cid: {} eid: {}", cid, exec_id); | ||||
|         info!(sl!(), "do_exec_process cid: {} eid: {}", cid, exec_id); | ||||
|  | ||||
|         let s = self.sandbox.clone(); | ||||
|         let mut sandbox = s.lock().unwrap(); | ||||
| @@ -293,6 +298,10 @@ impl agentService { | ||||
|  | ||||
|         ctr.run(p)?; | ||||
|  | ||||
|         // set epoller | ||||
|         let p = find_process(&mut sandbox, cid.as_str(), exec_id.as_str(), false)?; | ||||
|         p.create_epoller()?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
| @@ -403,6 +412,8 @@ impl agentService { | ||||
|             let _ = unistd::close(p.exit_pipe_r.unwrap()); | ||||
|         } | ||||
|  | ||||
|         p.close_epoller(); | ||||
|  | ||||
|         p.parent_stdin = None; | ||||
|         p.parent_stdout = None; | ||||
|         p.parent_stderr = None; | ||||
| @@ -427,13 +438,6 @@ impl agentService { | ||||
|         let cid = req.container_id.clone(); | ||||
|         let eid = req.exec_id.clone(); | ||||
|  | ||||
|         info!( | ||||
|             sl!(), | ||||
|             "write stdin"; | ||||
|             "container-id" => cid.clone(), | ||||
|             "exec-id" => eid.clone() | ||||
|         ); | ||||
|  | ||||
|         let s = self.sandbox.clone(); | ||||
|         let mut sandbox = s.lock().unwrap(); | ||||
|         let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false)?; | ||||
| @@ -477,6 +481,7 @@ impl agentService { | ||||
|         let eid = req.exec_id; | ||||
|  | ||||
|         let mut fd: RawFd = -1; | ||||
|         let mut epoller: Option<reaper::Epoller> = None; | ||||
|         { | ||||
|             let s = self.sandbox.clone(); | ||||
|             let mut sandbox = s.lock().unwrap(); | ||||
| @@ -485,6 +490,7 @@ impl agentService { | ||||
|  | ||||
|             if p.term_master.is_some() { | ||||
|                 fd = p.term_master.unwrap(); | ||||
|                 epoller = p.epoller.clone(); | ||||
|             } else if stdout { | ||||
|                 if p.parent_stdout.is_some() { | ||||
|                     fd = p.parent_stdout.unwrap(); | ||||
| @@ -494,6 +500,17 @@ impl agentService { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if let Some(epoller) = epoller { | ||||
|             // The process's epoller's poll() will return a file descriptor of the process's | ||||
|             // terminal or one end of its exited pipe. If it returns its terminal, it means | ||||
|             // there is data needed to be read out or it has been closed; if it returns the | ||||
|             // process's exited pipe, it means the process has exited and there is no data | ||||
|             // needed to be read out in its terminal, thus following read on it will read out | ||||
|             // "EOF" to terminate this process's io since the other end of this pipe has been | ||||
|             // closed in reap(). | ||||
|             fd = epoller.poll()?; | ||||
|         } | ||||
|  | ||||
|         if fd == -1 { | ||||
|             return Err(anyhow!(nix::Error::from_errno(nix::errno::Errno::EINVAL))); | ||||
|         } | ||||
| @@ -808,6 +825,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { | ||||
|             p.parent_stdin = None; | ||||
|         } | ||||
|  | ||||
|         p.close_epoller(); | ||||
|  | ||||
|         Ok(Empty::new()) | ||||
|     } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user