diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs index 8d77920c63..d99321a74a 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs @@ -16,7 +16,6 @@ use persist::sandbox_persist::Persist; use std::collections::HashMap; use std::os::unix::net::UnixStream; use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::{process::Child, sync::mpsc}; @@ -79,13 +78,12 @@ pub struct CloudHypervisorInner { pub(crate) _guest_memory_block_size_mb: u32, pub(crate) exit_notify: Option>, - pub(crate) exit_waiter: Mutex<(mpsc::Receiver, i32)>, } const CH_DEFAULT_TIMEOUT_SECS: u32 = 10; impl CloudHypervisorInner { - pub fn new() -> Self { + pub fn new(exit_notify: Option>) -> Self { let mut capabilities = Capabilities::new(); capabilities.set( CapabilityBits::BlockDeviceSupport @@ -95,7 +93,6 @@ impl CloudHypervisorInner { ); let (tx, rx) = channel(true); - let (exit_notify, exit_waiter) = mpsc::channel(1); Self { api_socket: None, @@ -122,8 +119,7 @@ impl CloudHypervisorInner { ch_features: None, _guest_memory_block_size_mb: 0, - exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), + exit_notify, } } @@ -138,14 +134,14 @@ impl CloudHypervisorInner { impl Default for CloudHypervisorInner { fn default() -> Self { - Self::new() + Self::new(None) } } #[async_trait] impl Persist for CloudHypervisorInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender; // Return a state object that will be saved by the caller. async fn save(&self) -> Result { @@ -166,11 +162,10 @@ impl Persist for CloudHypervisorInner { // Set the hypervisor state to the specified state async fn restore( - _hypervisor_args: Self::ConstructorArgs, + exit_notify: mpsc::Sender, hypervisor_state: Self::State, ) -> Result { let (tx, rx) = channel(true); - let (exit_notify, exit_waiter) = mpsc::channel(1); let mut ch = Self { config: Some(hypervisor_state.config), @@ -190,7 +185,6 @@ impl Persist for CloudHypervisorInner { jailer_root: String::default(), ch_features: None, exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), ..Default::default() }; @@ -207,7 +201,9 @@ mod tests { #[actix_rt::test] async fn test_save_clh() { - let mut clh = CloudHypervisorInner::new(); + let (exit_notify, _exit_waiter) = mpsc::channel(1); + + let mut clh = CloudHypervisorInner::new(Some(exit_notify.clone())); clh.id = String::from("123456"); clh.netns = Some(String::from("/var/run/netns/testnet")); clh.vm_path = String::from("/opt/kata/bin/cloud-hypervisor"); @@ -229,7 +225,7 @@ mod tests { assert!(!state.jailed); assert_eq!(state.hypervisor_type, HYPERVISOR_NAME_CH.to_string()); - let clh = CloudHypervisorInner::restore((), state.clone()) + let clh = CloudHypervisorInner::restore(exit_notify, state.clone()) .await .unwrap(); assert_eq!(clh.id, state.id); diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs index ac15e81479..49ae8a02e5 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs @@ -657,14 +657,9 @@ impl CloudHypervisorInner { Ok(()) } + #[allow(dead_code)] pub(crate) async fn wait_vm(&self) -> Result { - debug!(sl!(), "Waiting CH vmm"); - let mut waiter = self.exit_waiter.lock().await; - if let Some(exitcode) = waiter.0.recv().await { - waiter.1 = exitcode; - } - - Ok(waiter.1) + Ok(0) } pub(crate) fn pause_vm(&self) -> Result<()> { diff --git a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs index 7617646ea4..2dd16a9b7a 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs @@ -12,7 +12,7 @@ use kata_types::capabilities::{Capabilities, CapabilityBits}; use kata_types::config::hypervisor::Hypervisor as HypervisorConfig; use persist::sandbox_persist::Persist; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, Mutex, RwLock}; // Convenience macro to obtain the scope logger #[macro_export] @@ -29,15 +29,19 @@ mod utils; use inner::CloudHypervisorInner; -#[derive(Debug, Default, Clone)] +#[derive(Debug)] pub struct CloudHypervisor { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver, i32)>, } impl CloudHypervisor { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(CloudHypervisorInner::new())), + inner: Arc::new(RwLock::new(CloudHypervisorInner::new(Some(exit_notify)))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -47,6 +51,12 @@ impl CloudHypervisor { } } +impl Default for CloudHypervisor { + fn default() -> Self { + Self::new() + } +} + #[async_trait] impl Hypervisor for CloudHypervisor { async fn prepare_vm(&self, id: &str, netns: Option) -> Result<()> { @@ -65,8 +75,13 @@ impl Hypervisor for CloudHypervisor { } async fn wait_vm(&self) -> Result { - let inner = self.inner.read().await; - inner.wait_vm().await + debug!(sl!(), "Waiting CH vmm"); + let mut waiter = self.exit_waiter.lock().await; + if let Some(exitcode) = waiter.0.recv().await { + waiter.1 = exitcode; + } + + Ok(waiter.1) } async fn pause_vm(&self) -> Result<()> { @@ -204,12 +219,15 @@ impl Persist for CloudHypervisor { } async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = CloudHypervisorInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + + let inner = CloudHypervisorInner::restore(exit_notify, hypervisor_state).await?; Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs index be9b7b786a..45859b5b10 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs @@ -46,14 +46,12 @@ pub struct FcInner { pub(crate) capabilities: Capabilities, pub(crate) fc_process: Mutex>, pub(crate) exit_notify: Option>, - pub(crate) exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl FcInner { - pub fn new() -> FcInner { + pub fn new(exit_notify: mpsc::Sender<()>) -> FcInner { let mut capabilities = Capabilities::new(); capabilities.set(CapabilityBits::BlockDeviceSupport); - let (exit_notify, exit_waiter) = mpsc::channel(1); FcInner { id: String::default(), @@ -71,7 +69,6 @@ impl FcInner { capabilities, fc_process: Mutex::new(None), exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -124,11 +121,10 @@ impl FcInner { let mut child = cmd.stderr(Stdio::piped()).spawn()?; let stderr = child.stderr.take().unwrap(); - let exit_notify: mpsc::Sender<()> = self + let exit_notify = self .exit_notify .take() .ok_or_else(|| anyhow!("no exit notify"))?; - tokio::spawn(log_fc_stderr(stderr, exit_notify)); match child.id() { @@ -216,7 +212,7 @@ async fn log_fc_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Re #[async_trait] impl Persist for FcInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender<()>; async fn save(&self) -> Result { Ok(HypervisorState { @@ -231,12 +227,7 @@ impl Persist for FcInner { ..Default::default() }) } - async fn restore( - _hypervisor_args: Self::ConstructorArgs, - hypervisor_state: Self::State, - ) -> Result { - let (exit_notify, exit_waiter) = mpsc::channel(1); - + async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result { Ok(FcInner { id: hypervisor_state.id, asock_path: String::default(), @@ -253,7 +244,6 @@ impl Persist for FcInner { capabilities: Capabilities::new(), fc_process: Mutex::new(None), exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs index bb3bbbff6c..2178e7b2f7 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs @@ -102,21 +102,14 @@ impl FcInner { } pub(crate) async fn wait_vm(&self) -> Result { - debug!(sl(), "Wait fc sandbox"); - let mut waiter = self.exit_waiter.lock().await; - - //wait until the fc process exited. - waiter.0.recv().await; - let mut fc_process = self.fc_process.lock().await; if let Some(mut fc_process) = fc_process.take() { - if let Ok(status) = fc_process.wait().await { - waiter.1 = status.code().unwrap_or(0); - } + let status = fc_process.wait().await?; + Ok(status.code().unwrap_or(0)) + } else { + Err(anyhow!("the process has been reaped")) } - - Ok(waiter.1) } pub(crate) fn pause_vm(&self) -> Result<()> { diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs index adf77964c2..3b65eadd88 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs @@ -19,11 +19,14 @@ use kata_types::capabilities::Capabilities; use kata_types::capabilities::CapabilityBits; use persist::sandbox_persist::Persist; use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::Mutex; use tokio::sync::RwLock; #[derive(Debug)] pub struct Firecracker { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } // Convenience function to set the scope. @@ -39,8 +42,11 @@ impl Default for Firecracker { impl Firecracker { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(FcInner::new())), + inner: Arc::new(RwLock::new(FcInner::new(exit_notify))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -68,8 +74,18 @@ impl Hypervisor for Firecracker { } async fn wait_vm(&self) -> Result { + debug!(sl(), "Wait fc sandbox"); + let mut waiter = self.exit_waiter.lock().await; + + //wait until the fc process exited. + waiter.0.recv().await; + let inner = self.inner.read().await; - inner.wait_vm().await + if let Ok(exit_code) = inner.wait_vm().await { + waiter.1 = exit_code; + } + + Ok(waiter.1) } async fn pause_vm(&self) -> Result<()> { @@ -209,12 +225,15 @@ impl Persist for Firecracker { } /// Restore a component from a specified state. async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = FcInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + let inner = FcInner::restore(exit_notify, hypervisor_state).await?; + Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs index fde475ca70..e292662a4b 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs @@ -43,13 +43,10 @@ pub struct QemuInner { netns: Option, exit_notify: Option>, - exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl QemuInner { - pub fn new() -> QemuInner { - let (exit_notify, exit_waiter) = mpsc::channel(1); - + pub fn new(exit_notify: mpsc::Sender<()>) -> QemuInner { QemuInner { id: "".to_string(), qemu_process: Mutex::new(None), @@ -59,7 +56,6 @@ impl QemuInner { netns: None, exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -202,22 +198,14 @@ impl QemuInner { } pub(crate) async fn wait_vm(&self) -> Result { - info!(sl!(), "Wait QEMU VM"); - - let mut waiter = self.exit_waiter.lock().await; - - //wait until the qemu process exited. - waiter.0.recv().await; - let mut qemu_process = self.qemu_process.lock().await; if let Some(mut qemu_process) = qemu_process.take() { - if let Ok(status) = qemu_process.wait().await { - waiter.1 = status.code().unwrap_or(0); - } + let status = qemu_process.wait().await?; + Ok(status.code().unwrap_or(0)) + } else { + Err(anyhow!("the process has been reaped")) } - - Ok(waiter.1) } pub(crate) fn pause_vm(&self) -> Result<()> { @@ -589,7 +577,7 @@ impl QemuInner { #[async_trait] impl Persist for QemuInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender<()>; /// Save a state of hypervisor async fn save(&self) -> Result { @@ -602,12 +590,7 @@ impl Persist for QemuInner { } /// Restore hypervisor - async fn restore( - _hypervisor_args: Self::ConstructorArgs, - hypervisor_state: Self::State, - ) -> Result { - let (exit_notify, exit_waiter) = mpsc::channel(1); - + async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result { Ok(QemuInner { id: hypervisor_state.id, qemu_process: Mutex::new(None), @@ -617,7 +600,6 @@ impl Persist for QemuInner { netns: None, exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs index 32497c3584..6a6c923cab 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs @@ -20,10 +20,12 @@ use async_trait::async_trait; use std::sync::Arc; use tokio::sync::RwLock; +use tokio::sync::{mpsc, Mutex}; #[derive(Debug)] pub struct Qemu { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl Default for Qemu { @@ -34,8 +36,11 @@ impl Default for Qemu { impl Qemu { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(QemuInner::new())), + inner: Arc::new(RwLock::new(QemuInner::new(exit_notify))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -63,8 +68,19 @@ impl Hypervisor for Qemu { } async fn wait_vm(&self) -> Result { + info!(sl!(), "Wait QEMU VM"); + + let mut waiter = self.exit_waiter.lock().await; + + //wait until the qemu process exited. + waiter.0.recv().await; + let inner = self.inner.read().await; - inner.wait_vm().await + if let Ok(exit_code) = inner.wait_vm().await { + waiter.1 = exit_code; + } + + Ok(waiter.1) } async fn pause_vm(&self) -> Result<()> { @@ -204,12 +220,15 @@ impl Persist for Qemu { /// Restore a component from a specified state. async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = QemuInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + + let inner = QemuInner::restore(exit_notify, hypervisor_state).await?; Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } }