diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs index c23f080d13..8d77920c63 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs @@ -15,9 +15,10 @@ use kata_types::config::hypervisor::HYPERVISOR_NAME_CH; use persist::sandbox_persist::Persist; use std::collections::HashMap; use std::os::unix::net::UnixStream; -use tokio::process::Child; use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::sync::Mutex; use tokio::task::JoinHandle; +use tokio::{process::Child, sync::mpsc}; #[derive(Debug)] pub struct CloudHypervisorInner { @@ -76,6 +77,9 @@ pub struct CloudHypervisorInner { /// Size of memory block of guest OS in MB (currently unused) pub(crate) _guest_memory_block_size_mb: u32, + + pub(crate) exit_notify: Option>, + pub(crate) exit_waiter: Mutex<(mpsc::Receiver, i32)>, } const CH_DEFAULT_TIMEOUT_SECS: u32 = 10; @@ -91,6 +95,7 @@ impl CloudHypervisorInner { ); let (tx, rx) = channel(true); + let (exit_notify, exit_waiter) = mpsc::channel(1); Self { api_socket: None, @@ -116,6 +121,9 @@ impl CloudHypervisorInner { guest_protection_to_use: GuestProtection::NoProtection, ch_features: None, _guest_memory_block_size_mb: 0, + + exit_notify: Some(exit_notify), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -162,6 +170,7 @@ impl Persist for CloudHypervisorInner { hypervisor_state: Self::State, ) -> Result { let (tx, rx) = channel(true); + let (exit_notify, exit_waiter) = mpsc::channel(1); let mut ch = Self { config: Some(hypervisor_state.config), @@ -180,6 +189,8 @@ impl Persist for CloudHypervisorInner { timeout_secs: CH_DEFAULT_TIMEOUT_SECS as i32, jailer_root: String::default(), ch_features: None, + exit_notify: Some(exit_notify), + exit_waiter: Mutex::new((exit_waiter, 0)), ..Default::default() }; diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs index 7764baddde..ac15e81479 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs @@ -37,13 +37,13 @@ use std::os::unix::net::UnixStream; use std::path::Path; use std::process::Stdio; use std::sync::{Arc, RwLock}; -use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio::process::{Child, Command}; use tokio::sync::watch::Receiver; use tokio::task; use tokio::task::JoinHandle; use tokio::time::Duration; +use tokio::{io::AsyncBufReadExt, sync::mpsc}; const CH_NAME: &str = "cloud-hypervisor"; @@ -410,7 +410,13 @@ impl CloudHypervisorInner { .map_err(|e| anyhow!(e))? .clone(); - let ch_outputlogger_task = tokio::spawn(cloud_hypervisor_log_output(child, shutdown)); + let exit_notify: mpsc::Sender = self + .exit_notify + .take() + .ok_or_else(|| anyhow!("no exit notify"))?; + + let ch_outputlogger_task = + tokio::spawn(cloud_hypervisor_log_output(child, shutdown, exit_notify)); let tasks = vec![ch_outputlogger_task]; @@ -651,6 +657,16 @@ impl CloudHypervisorInner { Ok(()) } + pub(crate) async fn wait_vm(&self) -> Result { + debug!(sl!(), "Waiting CH vmm"); + let mut waiter = self.exit_waiter.lock().await; + if let Some(exitcode) = waiter.0.recv().await { + waiter.1 = exitcode; + } + + Ok(waiter.1) + } + pub(crate) fn pause_vm(&self) -> Result<()> { Ok(()) } @@ -778,7 +794,11 @@ impl CloudHypervisorInner { // Log all output from the CH process until a shutdown signal is received. // When that happens, stop logging and wait for the child process to finish // before returning. -async fn cloud_hypervisor_log_output(mut child: Child, mut shutdown: Receiver) -> Result<()> { +async fn cloud_hypervisor_log_output( + mut child: Child, + mut shutdown: Receiver, + exit_notify: mpsc::Sender, +) -> Result<()> { let stdout = child .stdout .as_mut() @@ -833,7 +853,10 @@ async fn cloud_hypervisor_log_output(mut child: Child, mut shutdown: Receiver Result { + let inner = self.inner.read().await; + inner.wait_vm().await + } + async fn pause_vm(&self) -> Result<()> { let inner = self.inner.write().await; inner.pause_vm() diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs index 83f875e126..ab51f8dcc4 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs @@ -29,6 +29,7 @@ use nix::mount::MsFlags; use persist::sandbox_persist::Persist; use std::cmp::Ordering; use std::{collections::HashSet, fs::create_dir_all}; +use tokio::sync::mpsc; const DRAGONBALL_KERNEL: &str = "vmlinux"; const DRAGONBALL_INITRD: &str = "initrd"; @@ -88,7 +89,7 @@ pub struct DragonballInner { } impl DragonballInner { - pub fn new() -> DragonballInner { + pub fn new(exit_notify: mpsc::Sender) -> DragonballInner { let mut capabilities = Capabilities::new(); capabilities.set( CapabilityBits::BlockDeviceSupport @@ -106,7 +107,7 @@ impl DragonballInner { pending_devices: vec![], state: VmmState::NotReady, jailed: false, - vmm_instance: VmmInstance::new(""), + vmm_instance: VmmInstance::new("", exit_notify), run_dir: "".to_string(), cached_block_devices: Default::default(), capabilities, @@ -498,7 +499,7 @@ impl DragonballInner { #[async_trait] impl Persist for DragonballInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender; /// Save a state of hypervisor async fn save(&self) -> Result { @@ -519,7 +520,7 @@ impl Persist for DragonballInner { /// Restore hypervisor async fn restore( - _hypervisor_args: Self::ConstructorArgs, + hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { Ok(DragonballInner { @@ -530,7 +531,7 @@ impl Persist for DragonballInner { netns: hypervisor_state.netns, config: hypervisor_state.config, state: VmmState::NotReady, - vmm_instance: VmmInstance::new(""), + vmm_instance: VmmInstance::new("", hypervisor_args), run_dir: hypervisor_state.run_dir, pending_devices: vec![], cached_block_devices: hypervisor_state.cached_block_devices, diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner_device.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner_device.rs index a62ee2b7a3..a9a7e28489 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner_device.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner_device.rs @@ -7,6 +7,13 @@ use std::convert::TryFrom; use std::path::PathBuf; +use super::{build_dragonball_network_config, DragonballInner}; +use crate::device::pci_path::PciPath; +use crate::VhostUserConfig; +use crate::{ + device::DeviceType, HybridVsockConfig, NetworkConfig, ShareFsConfig, ShareFsMountConfig, + ShareFsMountOperation, ShareFsMountType, VfioDevice, VmmState, JAILER_ROOT, +}; use anyhow::{anyhow, Context, Result}; use dbs_utils::net::MacAddr; use dragonball::api::v1::VhostUserConfig as DragonballVhostUserConfig; @@ -19,14 +26,6 @@ use dragonball::device_manager::{ vfio_dev_mgr::{HostDeviceConfig, VfioPciDeviceConfig}, }; -use super::{build_dragonball_network_config, DragonballInner}; -use crate::device::pci_path::PciPath; -use crate::VhostUserConfig; -use crate::{ - device::DeviceType, HybridVsockConfig, NetworkConfig, ShareFsConfig, ShareFsMountConfig, - ShareFsMountOperation, ShareFsMountType, VfioDevice, VmmState, JAILER_ROOT, -}; - const MB_TO_B: u32 = 1024 * 1024; const DEFAULT_VIRTIO_FS_NUM_QUEUES: i32 = 1; const DEFAULT_VIRTIO_FS_QUEUE_SIZE: i32 = 1024; @@ -431,12 +430,14 @@ impl DragonballInner { #[cfg(test)] mod tests { use dragonball::api::v1::FsDeviceConfigInfo; + use tokio::sync::mpsc; use crate::dragonball::DragonballInner; #[test] fn test_parse_inline_virtiofs_args() { - let mut dragonball = DragonballInner::new(); + let (tx, _) = mpsc::channel(1); + let mut dragonball = DragonballInner::new(tx); let mut fs_cfg = FsDeviceConfigInfo::default(); // no_open and writeback_cache is the default, so test open and no_writeback_cache. "-d" diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs index ab675aef35..9313854714 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs @@ -23,13 +23,14 @@ use dragonball::api::v1::{ }; use kata_types::capabilities::{Capabilities, CapabilityBits}; use kata_types::config::hypervisor::Hypervisor as HypervisorConfig; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, Mutex, RwLock}; use tracing::instrument; use crate::{DeviceType, Hypervisor, MemoryConfig, NetworkConfig, VcpuThreadIds}; pub struct Dragonball { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver, i32)>, } impl std::fmt::Debug for Dragonball { @@ -46,8 +47,11 @@ impl Default for Dragonball { impl Dragonball { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(DragonballInner::new())), + inner: Arc::new(RwLock::new(DragonballInner::new(exit_notify))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -81,6 +85,15 @@ impl Hypervisor for Dragonball { inner.stop_vm() } + async fn wait_vm(&self) -> Result { + let mut waiter = self.exit_waiter.lock().await; + if let Some(exit_code) = waiter.0.recv().await { + waiter.1 = exit_code; + } + + Ok(waiter.1) + } + async fn pause_vm(&self) -> Result<()> { let inner = self.inner.read().await; inner.pause_vm() @@ -218,12 +231,15 @@ impl Persist for Dragonball { } /// Restore a component from a specified state. async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = DragonballInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + + let inner = DragonballInner::restore(exit_notify, hypervisor_state).await?; Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs index 183b8f57e6..4108c8896d 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs @@ -28,6 +28,7 @@ use dragonball::{ }; use nix::sched::{setns, CloneFlags}; use seccompiler::BpfProgram; +use tokio::sync::mpsc; use vmm_sys_util::eventfd::EventFd; use crate::ShareFsMountOperation; @@ -49,10 +50,11 @@ pub struct VmmInstance { to_vmm_fd: EventFd, seccomp: BpfProgram, vmm_thread: Option>>, + exit_notify: Option>, } impl VmmInstance { - pub fn new(id: &str) -> Self { + pub fn new(id: &str, exit_notify: mpsc::Sender) -> Self { let vmm_shared_info = Arc::new(RwLock::new(InstanceInfo::new( String::from(id), DRAGONBALL_VERSION.to_string(), @@ -68,6 +70,7 @@ impl VmmInstance { to_vmm_fd, seccomp: vec![], vmm_thread: None, + exit_notify: Some(exit_notify), } } @@ -122,6 +125,7 @@ impl VmmInstance { ) .expect("Failed to start vmm"); let vmm_shared_info = self.get_shared_info(); + let exit_notify = self.exit_notify.take().unwrap(); self.vmm_thread = Some( thread::Builder::new() @@ -141,6 +145,12 @@ impl VmmInstance { } let exit_code = Vmm::run_vmm_event_loop(Arc::new(Mutex::new(vmm)), vmm_service); + exit_notify + .try_send(exit_code) + .map_err(|e| { + error!(sl!(), "vmm-master thread fail to send. {:?}", e); + }) + .ok(); debug!(sl!(), "run vmm thread exited: {}", exit_code); Ok(exit_code) }() diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs index 58697aafe4..be9b7b786a 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs @@ -8,7 +8,7 @@ use crate::HypervisorState; use crate::MemoryConfig; use crate::HYPERVISOR_FIRECRACKER; use crate::{device::DeviceType, VmmState}; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use hyper::Client; use hyperlocal::{UnixClientExt, UnixConnector}; @@ -19,7 +19,12 @@ use kata_types::{ use nix::sched::{setns, CloneFlags}; use persist::sandbox_persist::Persist; use std::os::unix::io::AsRawFd; -use tokio::process::Command; +use std::process::Stdio; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::{Child, ChildStderr, Command}; +use tokio::sync::mpsc; +use tokio::sync::Mutex; unsafe impl Send for FcInner {} unsafe impl Sync for FcInner {} @@ -39,12 +44,17 @@ pub struct FcInner { pub(crate) run_dir: String, pub(crate) pending_devices: Vec, pub(crate) capabilities: Capabilities, + pub(crate) fc_process: Mutex>, + pub(crate) exit_notify: Option>, + pub(crate) exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl FcInner { pub fn new() -> FcInner { let mut capabilities = Capabilities::new(); capabilities.set(CapabilityBits::BlockDeviceSupport); + let (exit_notify, exit_waiter) = mpsc::channel(1); + FcInner { id: String::default(), asock_path: String::default(), @@ -59,6 +69,9 @@ impl FcInner { run_dir: String::default(), pending_devices: vec![], capabilities, + fc_process: Mutex::new(None), + exit_notify: Some(exit_notify), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -108,7 +121,15 @@ impl FcInner { }); } - let mut child = cmd.spawn()?; + let mut child = cmd.stderr(Stdio::piped()).spawn()?; + + let stderr = child.stderr.take().unwrap(); + let exit_notify: mpsc::Sender<()> = self + .exit_notify + .take() + .ok_or_else(|| anyhow!("no exit notify"))?; + + tokio::spawn(log_fc_stderr(stderr, exit_notify)); match child.id() { Some(id) => { @@ -122,8 +143,12 @@ impl FcInner { None => { let exit_status = child.wait().await?; error!(sl(), "Process exited, status: {:?}", exit_status); + return Err(anyhow!("fc vmm start failed with: {:?}", exit_status)); } }; + + self.fc_process = Mutex::new(Some(child)); + Ok(()) } @@ -167,6 +192,27 @@ impl FcInner { } } +async fn log_fc_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Result<()> { + info!(sl!(), "starting reading fc stderr"); + + let stderr_reader = BufReader::new(stderr); + let mut stderr_lines = stderr_reader.lines(); + + while let Some(buffer) = stderr_lines + .next_line() + .await + .context("next_line() failed on fc stderr")? + { + info!(sl!(), "fc stderr: {:?}", buffer); + } + + // Notfiy the waiter the process exit. + let _ = exit_notify.try_send(()); + + info!(sl!(), "finished reading fc stderr"); + Ok(()) +} + #[async_trait] impl Persist for FcInner { type State = HypervisorState; @@ -189,6 +235,8 @@ impl Persist for FcInner { _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Ok(FcInner { id: hypervisor_state.id, asock_path: String::default(), @@ -203,6 +251,9 @@ impl Persist for FcInner { pending_devices: vec![], run_dir: hypervisor_state.run_dir, capabilities: Capabilities::new(), + fc_process: Mutex::new(None), + exit_notify: Some(exit_notify), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs index d176ec43d1..bb3bbbff6c 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs @@ -78,17 +78,47 @@ impl FcInner { debug!(sl(), "Stopping sandbox"); if self.state != VmmState::VmRunning { debug!(sl(), "VM not running!"); - } else if let Some(pid_to_kill) = &self.pid { - let pid = ::nix::unistd::Pid::from_raw(*pid_to_kill as i32); - if let Err(err) = ::nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL) { - if err != ::nix::Error::ESRCH { - debug!(sl(), "Failed to kill VMM with pid {} {:?}", pid, err); + } else { + let mut fc_process = self.fc_process.lock().await; + if let Some(fc_process) = fc_process.as_mut() { + if fc_process.id().is_some() { + info!(sl!(), "FcInner::stop_vm(): kill()'ing fc"); + return fc_process.kill().await.map_err(anyhow::Error::from); + } else { + info!( + sl!(), + "FcInner::stop_vm(): fc process isn't running (likely stopped already)" + ); } + } else { + info!( + sl!(), + "FcInner::stop_vm(): fc process isn't running (likely stopped already)" + ); } } + Ok(()) } + pub(crate) async fn wait_vm(&self) -> Result { + debug!(sl(), "Wait fc sandbox"); + let mut waiter = self.exit_waiter.lock().await; + + //wait until the fc process exited. + waiter.0.recv().await; + + let mut fc_process = self.fc_process.lock().await; + + if let Some(mut fc_process) = fc_process.take() { + if let Ok(status) = fc_process.wait().await { + waiter.1 = status.code().unwrap_or(0); + } + } + + Ok(waiter.1) + } + pub(crate) fn pause_vm(&self) -> Result<()> { warn!(sl(), "Pause VM: Not implemented"); Ok(()) diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs index a7ba3db54d..adf77964c2 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs @@ -67,6 +67,11 @@ impl Hypervisor for Firecracker { inner.stop_vm().await } + async fn wait_vm(&self) -> Result { + let inner = self.inner.read().await; + inner.wait_vm().await + } + async fn pause_vm(&self) -> Result<()> { let inner = self.inner.read().await; inner.pause_vm() diff --git a/src/runtime-rs/crates/hypervisor/src/lib.rs b/src/runtime-rs/crates/hypervisor/src/lib.rs index 6f0c74f74c..71e7f632f7 100644 --- a/src/runtime-rs/crates/hypervisor/src/lib.rs +++ b/src/runtime-rs/crates/hypervisor/src/lib.rs @@ -96,6 +96,7 @@ pub trait Hypervisor: std::fmt::Debug + Send + Sync { async fn prepare_vm(&self, id: &str, netns: Option) -> Result<()>; async fn start_vm(&self, timeout: i32) -> Result<()>; async fn stop_vm(&self) -> Result<()>; + async fn wait_vm(&self) -> Result; async fn pause_vm(&self) -> Result<()>; async fn save_vm(&self) -> Result<()>; async fn resume_vm(&self) -> Result<()>; diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs index a31fb481cc..fde475ca70 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use std::convert::TryInto; use std::path::Path; use std::process::Stdio; +use tokio::sync::{mpsc, Mutex}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::{Child, ChildStderr, Command}, @@ -34,23 +35,31 @@ pub struct QemuInner { /// sandbox id id: String, - qemu_process: Option, + qemu_process: Mutex>, qmp: Option, config: HypervisorConfig, devices: Vec, netns: Option, + + exit_notify: Option>, + exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl QemuInner { pub fn new() -> QemuInner { + let (exit_notify, exit_waiter) = mpsc::channel(1); + QemuInner { id: "".to_string(), - qemu_process: None, + qemu_process: Mutex::new(None), qmp: None, config: Default::default(), devices: Vec::new(), netns: None, + + exit_notify: Some(exit_notify), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -147,12 +156,18 @@ impl QemuInner { }); } - self.qemu_process = Some(command.stderr(Stdio::piped()).spawn()?); + let mut qemu_process = command.stderr(Stdio::piped()).spawn()?; + let stderr = qemu_process.stderr.take().unwrap(); + self.qemu_process = Mutex::new(Some(qemu_process)); + info!(sl!(), "qemu process started"); - if let Some(ref mut qemu_process) = &mut self.qemu_process { - tokio::spawn(log_qemu_stderr(qemu_process.stderr.take().unwrap())); - } + let exit_notify: mpsc::Sender<()> = self + .exit_notify + .take() + .ok_or_else(|| anyhow!("no exit notify"))?; + + tokio::spawn(log_qemu_stderr(stderr, exit_notify)); match Qmp::new(QMP_SOCKET_FILE) { Ok(qmp) => self.qmp = Some(qmp), @@ -167,7 +182,9 @@ impl QemuInner { pub(crate) async fn stop_vm(&mut self) -> Result<()> { info!(sl!(), "Stopping QEMU VM"); - if let Some(ref mut qemu_process) = &mut self.qemu_process { + + let mut qemu_process = self.qemu_process.lock().await; + if let Some(qemu_process) = qemu_process.as_mut() { let is_qemu_running = qemu_process.id().is_some(); if is_qemu_running { info!(sl!(), "QemuInner::stop_vm(): kill()'ing qemu"); @@ -184,6 +201,25 @@ impl QemuInner { } } + pub(crate) async fn wait_vm(&self) -> Result { + info!(sl!(), "Wait QEMU VM"); + + let mut waiter = self.exit_waiter.lock().await; + + //wait until the qemu process exited. + waiter.0.recv().await; + + let mut qemu_process = self.qemu_process.lock().await; + + if let Some(mut qemu_process) = qemu_process.take() { + if let Ok(status) = qemu_process.wait().await { + waiter.1 = status.code().unwrap_or(0); + } + } + + Ok(waiter.1) + } + pub(crate) fn pause_vm(&self) -> Result<()> { info!(sl!(), "Pausing QEMU VM"); todo!() @@ -224,7 +260,7 @@ impl QemuInner { pub(crate) async fn get_vmm_master_tid(&self) -> Result { info!(sl!(), "QemuInner::get_vmm_master_tid()"); - if let Some(qemu_process) = &self.qemu_process { + if let Some(qemu_process) = self.qemu_process.lock().await.as_ref() { if let Some(qemu_pid) = qemu_process.id() { info!( sl!(), @@ -491,7 +527,7 @@ impl QemuInner { } } -async fn log_qemu_stderr(stderr: ChildStderr) -> Result<()> { +async fn log_qemu_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Result<()> { info!(sl!(), "starting reading qemu stderr"); let stderr_reader = BufReader::new(stderr); @@ -505,6 +541,9 @@ async fn log_qemu_stderr(stderr: ChildStderr) -> Result<()> { info!(sl!(), "qemu stderr: {:?}", buffer); } + // Notfiy the waiter the process exit. + let _ = exit_notify.try_send(()); + info!(sl!(), "finished reading qemu stderr"); Ok(()) } @@ -567,13 +606,18 @@ impl Persist for QemuInner { _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Ok(QemuInner { id: hypervisor_state.id, - qemu_process: None, + qemu_process: Mutex::new(None), qmp: None, config: hypervisor_state.config, devices: Vec::new(), netns: None, + + exit_notify: Some(exit_notify), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs index 0857d303b7..32497c3584 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs @@ -62,6 +62,11 @@ impl Hypervisor for Qemu { inner.stop_vm().await } + async fn wait_vm(&self) -> Result { + let inner = self.inner.read().await; + inner.wait_vm().await + } + async fn pause_vm(&self) -> Result<()> { let inner = self.inner.read().await; inner.pause_vm()