diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 5f321b28e8..f647dc7fdf 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -55,7 +55,7 @@ use crate::pipestream::PipeStream; use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SYNC_SUCCESS}; use crate::sync_with_async::{read_async, write_async}; use async_trait::async_trait; -use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncBufReadExt; const STATE_FILENAME: &str = "state.json"; const EXEC_FIFO_FILENAME: &str = "exec.fifo"; @@ -1513,8 +1513,10 @@ fn set_sysctls(sysctls: &HashMap) -> Result<()> { Ok(()) } +use std::io::Read; use std::os::unix::process::ExitStatusExt; use std::process::Stdio; +use std::sync::mpsc::{self, RecvTimeoutError}; use std::time::Duration; async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { @@ -1536,11 +1538,9 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { let _ = unistd::close(wfd); }); - let mut pipe_r = PipeStream::from_fd(rfd); - let mut pipe_w = PipeStream::from_fd(wfd); - match unistd::fork()? { ForkResult::Parent { child } => { + let mut pipe_r = PipeStream::from_fd(rfd); let buf = read_async(&mut pipe_r).await?; let status = if buf.len() == 4 { let buf_array: [u8; 4] = [buf[0], buf[1], buf[2], buf[3]]; @@ -1565,13 +1565,13 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { } ForkResult::Child => { - let (mut tx, mut rx) = tokio::sync::mpsc::channel(100); - let (tx_logger, rx_logger) = tokio::sync::oneshot::channel(); + let (tx, rx) = mpsc::channel(); + let (tx_logger, rx_logger) = mpsc::channel(); tx_logger.send(logger.clone()).unwrap(); - let handle = tokio::spawn(async move { - let logger = rx_logger.await.unwrap(); + let handle = std::thread::spawn(move || { + let logger = rx_logger.recv().unwrap(); // write oci state to child let env: HashMap = envs @@ -1582,7 +1582,7 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { }) .collect(); - let mut child = tokio::process::Command::new(path.to_str().unwrap()) + let mut child = std::process::Command::new(path.to_str().unwrap()) .args(args.iter()) .envs(env.iter()) .stdin(Stdio::piped()) @@ -1592,7 +1592,7 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { .unwrap(); // send out our pid - tx.send(child.id() as libc::pid_t).await.unwrap(); + tx.send(child.id() as libc::pid_t).unwrap(); info!(logger, "hook grand: {}", child.id()); child @@ -1600,7 +1600,6 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { .as_mut() .unwrap() .write_all(state.as_bytes()) - .await .unwrap(); // read something from stdout for debug @@ -1610,10 +1609,9 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { .as_mut() .unwrap() .read_to_string(&mut out) - .await .unwrap(); info!(logger, "child stdout: {}", out.as_str()); - match child.await { + match child.wait() { Ok(exit) => { let code: i32 = if exit.success() { 0 @@ -1624,7 +1622,7 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { } }; - tx.send(code).await.unwrap(); + tx.send(code).unwrap(); } Err(e) => { @@ -1644,33 +1642,29 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { // -- FIXME // just in case. Should not happen any more - tx.send(0).await.unwrap(); + tx.send(0).unwrap(); } } }); - let pid = rx.recv().await.unwrap(); + let pid = rx.recv().unwrap(); info!(logger, "hook grand: {}", pid); let status = { if let Some(timeout) = h.timeout { - let timeout = tokio::time::delay_for(Duration::from_secs(timeout as u64)); - tokio::select! { - v = rx.recv() => { - match v { - Some(s) => s, - None => { - let _ = signal::kill(Pid::from_raw(pid), Some(Signal::SIGKILL)); - -libc::EPIPE - } - } - } - _ = timeout => { + match rx.recv_timeout(Duration::from_secs(timeout as u64)) { + Ok(s) => s, + Err(e) => { + let error = if e == RecvTimeoutError::Timeout { + -libc::ETIMEDOUT + } else { + -libc::EPIPE + }; let _ = signal::kill(Pid::from_raw(pid), Some(Signal::SIGKILL)); - -libc::ETIMEDOUT + error } } - } else if let Some(s) = rx.recv().await { + } else if let Ok(s) = rx.recv() { s } else { let _ = signal::kill(Pid::from_raw(pid), Some(Signal::SIGKILL)); @@ -1678,13 +1672,12 @@ async fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> { } }; - handle.await.unwrap(); - let _ = write_async( - &mut pipe_w, + handle.join().unwrap(); + let _ = write_sync( + wfd, SYNC_DATA, std::str::from_utf8(&status.to_be_bytes()).unwrap_or_default(), - ) - .await; + ); std::process::exit(0); } }