sandbox: fix the issue of hypervisor's wait_vm

Since the wait_vm would be called before calling stop_vm,
which would take the reader lock, thus blocking the stop_vm
getting the writer lock, which would trigge the dead lock.

Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
Fupan Li
2024-09-19 09:25:33 +08:00
parent 79a3b4e2e5
commit ba94eed891
8 changed files with 97 additions and 85 deletions

View File

@@ -16,7 +16,6 @@ use persist::sandbox_persist::Persist;
use std::collections::HashMap;
use std::os::unix::net::UnixStream;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::{process::Child, sync::mpsc};
@@ -79,13 +78,12 @@ pub struct CloudHypervisorInner {
pub(crate) _guest_memory_block_size_mb: u32,
pub(crate) exit_notify: Option<mpsc::Sender<i32>>,
pub(crate) exit_waiter: Mutex<(mpsc::Receiver<i32>, i32)>,
}
const CH_DEFAULT_TIMEOUT_SECS: u32 = 10;
impl CloudHypervisorInner {
pub fn new() -> Self {
pub fn new(exit_notify: Option<mpsc::Sender<i32>>) -> Self {
let mut capabilities = Capabilities::new();
capabilities.set(
CapabilityBits::BlockDeviceSupport
@@ -95,7 +93,6 @@ impl CloudHypervisorInner {
);
let (tx, rx) = channel(true);
let (exit_notify, exit_waiter) = mpsc::channel(1);
Self {
api_socket: None,
@@ -122,8 +119,7 @@ impl CloudHypervisorInner {
ch_features: None,
_guest_memory_block_size_mb: 0,
exit_notify: Some(exit_notify),
exit_waiter: Mutex::new((exit_waiter, 0)),
exit_notify,
}
}
@@ -138,14 +134,14 @@ impl CloudHypervisorInner {
impl Default for CloudHypervisorInner {
fn default() -> Self {
Self::new()
Self::new(None)
}
}
#[async_trait]
impl Persist for CloudHypervisorInner {
type State = HypervisorState;
type ConstructorArgs = ();
type ConstructorArgs = mpsc::Sender<i32>;
// Return a state object that will be saved by the caller.
async fn save(&self) -> Result<Self::State> {
@@ -166,11 +162,10 @@ impl Persist for CloudHypervisorInner {
// Set the hypervisor state to the specified state
async fn restore(
_hypervisor_args: Self::ConstructorArgs,
exit_notify: mpsc::Sender<i32>,
hypervisor_state: Self::State,
) -> Result<Self> {
let (tx, rx) = channel(true);
let (exit_notify, exit_waiter) = mpsc::channel(1);
let mut ch = Self {
config: Some(hypervisor_state.config),
@@ -190,7 +185,6 @@ impl Persist for CloudHypervisorInner {
jailer_root: String::default(),
ch_features: None,
exit_notify: Some(exit_notify),
exit_waiter: Mutex::new((exit_waiter, 0)),
..Default::default()
};
@@ -207,7 +201,9 @@ mod tests {
#[actix_rt::test]
async fn test_save_clh() {
let mut clh = CloudHypervisorInner::new();
let (exit_notify, _exit_waiter) = mpsc::channel(1);
let mut clh = CloudHypervisorInner::new(Some(exit_notify.clone()));
clh.id = String::from("123456");
clh.netns = Some(String::from("/var/run/netns/testnet"));
clh.vm_path = String::from("/opt/kata/bin/cloud-hypervisor");
@@ -229,7 +225,7 @@ mod tests {
assert!(!state.jailed);
assert_eq!(state.hypervisor_type, HYPERVISOR_NAME_CH.to_string());
let clh = CloudHypervisorInner::restore((), state.clone())
let clh = CloudHypervisorInner::restore(exit_notify, state.clone())
.await
.unwrap();
assert_eq!(clh.id, state.id);

View File

@@ -657,14 +657,9 @@ impl CloudHypervisorInner {
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn wait_vm(&self) -> Result<i32> {
debug!(sl!(), "Waiting CH vmm");
let mut waiter = self.exit_waiter.lock().await;
if let Some(exitcode) = waiter.0.recv().await {
waiter.1 = exitcode;
}
Ok(waiter.1)
Ok(0)
}
pub(crate) fn pause_vm(&self) -> Result<()> {

View File

@@ -12,7 +12,7 @@ use kata_types::capabilities::{Capabilities, CapabilityBits};
use kata_types::config::hypervisor::Hypervisor as HypervisorConfig;
use persist::sandbox_persist::Persist;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, Mutex, RwLock};
// Convenience macro to obtain the scope logger
#[macro_export]
@@ -29,15 +29,19 @@ mod utils;
use inner::CloudHypervisorInner;
#[derive(Debug, Default, Clone)]
#[derive(Debug)]
pub struct CloudHypervisor {
inner: Arc<RwLock<CloudHypervisorInner>>,
exit_waiter: Mutex<(mpsc::Receiver<i32>, i32)>,
}
impl CloudHypervisor {
pub fn new() -> Self {
let (exit_notify, exit_waiter) = mpsc::channel(1);
Self {
inner: Arc::new(RwLock::new(CloudHypervisorInner::new())),
inner: Arc::new(RwLock::new(CloudHypervisorInner::new(Some(exit_notify)))),
exit_waiter: Mutex::new((exit_waiter, 0)),
}
}
@@ -47,6 +51,12 @@ impl CloudHypervisor {
}
}
impl Default for CloudHypervisor {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Hypervisor for CloudHypervisor {
async fn prepare_vm(&self, id: &str, netns: Option<String>) -> Result<()> {
@@ -65,8 +75,13 @@ impl Hypervisor for CloudHypervisor {
}
async fn wait_vm(&self) -> Result<i32> {
let inner = self.inner.read().await;
inner.wait_vm().await
debug!(sl!(), "Waiting CH vmm");
let mut waiter = self.exit_waiter.lock().await;
if let Some(exitcode) = waiter.0.recv().await {
waiter.1 = exitcode;
}
Ok(waiter.1)
}
async fn pause_vm(&self) -> Result<()> {
@@ -204,12 +219,15 @@ impl Persist for CloudHypervisor {
}
async fn restore(
hypervisor_args: Self::ConstructorArgs,
_hypervisor_args: Self::ConstructorArgs,
hypervisor_state: Self::State,
) -> Result<Self> {
let inner = CloudHypervisorInner::restore(hypervisor_args, hypervisor_state).await?;
let (exit_notify, exit_waiter) = mpsc::channel(1);
let inner = CloudHypervisorInner::restore(exit_notify, hypervisor_state).await?;
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
exit_waiter: Mutex::new((exit_waiter, 0)),
})
}
}

View File

@@ -46,14 +46,12 @@ pub struct FcInner {
pub(crate) capabilities: Capabilities,
pub(crate) fc_process: Mutex<Option<Child>>,
pub(crate) exit_notify: Option<mpsc::Sender<()>>,
pub(crate) exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
}
impl FcInner {
pub fn new() -> FcInner {
pub fn new(exit_notify: mpsc::Sender<()>) -> FcInner {
let mut capabilities = Capabilities::new();
capabilities.set(CapabilityBits::BlockDeviceSupport);
let (exit_notify, exit_waiter) = mpsc::channel(1);
FcInner {
id: String::default(),
@@ -71,7 +69,6 @@ impl FcInner {
capabilities,
fc_process: Mutex::new(None),
exit_notify: Some(exit_notify),
exit_waiter: Mutex::new((exit_waiter, 0)),
}
}
@@ -124,11 +121,10 @@ impl FcInner {
let mut child = cmd.stderr(Stdio::piped()).spawn()?;
let stderr = child.stderr.take().unwrap();
let exit_notify: mpsc::Sender<()> = self
let exit_notify = self
.exit_notify
.take()
.ok_or_else(|| anyhow!("no exit notify"))?;
tokio::spawn(log_fc_stderr(stderr, exit_notify));
match child.id() {
@@ -216,7 +212,7 @@ async fn log_fc_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Re
#[async_trait]
impl Persist for FcInner {
type State = HypervisorState;
type ConstructorArgs = ();
type ConstructorArgs = mpsc::Sender<()>;
async fn save(&self) -> Result<Self::State> {
Ok(HypervisorState {
@@ -231,12 +227,7 @@ impl Persist for FcInner {
..Default::default()
})
}
async fn restore(
_hypervisor_args: Self::ConstructorArgs,
hypervisor_state: Self::State,
) -> Result<Self> {
let (exit_notify, exit_waiter) = mpsc::channel(1);
async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result<Self> {
Ok(FcInner {
id: hypervisor_state.id,
asock_path: String::default(),
@@ -253,7 +244,6 @@ impl Persist for FcInner {
capabilities: Capabilities::new(),
fc_process: Mutex::new(None),
exit_notify: Some(exit_notify),
exit_waiter: Mutex::new((exit_waiter, 0)),
})
}
}

View File

@@ -102,21 +102,14 @@ impl FcInner {
}
pub(crate) async fn wait_vm(&self) -> Result<i32> {
debug!(sl(), "Wait fc sandbox");
let mut waiter = self.exit_waiter.lock().await;
//wait until the fc process exited.
waiter.0.recv().await;
let mut fc_process = self.fc_process.lock().await;
if let Some(mut fc_process) = fc_process.take() {
if let Ok(status) = fc_process.wait().await {
waiter.1 = status.code().unwrap_or(0);
}
let status = fc_process.wait().await?;
Ok(status.code().unwrap_or(0))
} else {
Err(anyhow!("the process has been reaped"))
}
Ok(waiter.1)
}
pub(crate) fn pause_vm(&self) -> Result<()> {

View File

@@ -19,11 +19,14 @@ use kata_types::capabilities::Capabilities;
use kata_types::capabilities::CapabilityBits;
use persist::sandbox_persist::Persist;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct Firecracker {
inner: Arc<RwLock<FcInner>>,
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
}
// Convenience function to set the scope.
@@ -39,8 +42,11 @@ impl Default for Firecracker {
impl Firecracker {
pub fn new() -> Self {
let (exit_notify, exit_waiter) = mpsc::channel(1);
Self {
inner: Arc::new(RwLock::new(FcInner::new())),
inner: Arc::new(RwLock::new(FcInner::new(exit_notify))),
exit_waiter: Mutex::new((exit_waiter, 0)),
}
}
@@ -68,8 +74,18 @@ impl Hypervisor for Firecracker {
}
async fn wait_vm(&self) -> Result<i32> {
debug!(sl(), "Wait fc sandbox");
let mut waiter = self.exit_waiter.lock().await;
//wait until the fc process exited.
waiter.0.recv().await;
let inner = self.inner.read().await;
inner.wait_vm().await
if let Ok(exit_code) = inner.wait_vm().await {
waiter.1 = exit_code;
}
Ok(waiter.1)
}
async fn pause_vm(&self) -> Result<()> {
@@ -209,12 +225,15 @@ impl Persist for Firecracker {
}
/// Restore a component from a specified state.
async fn restore(
hypervisor_args: Self::ConstructorArgs,
_hypervisor_args: Self::ConstructorArgs,
hypervisor_state: Self::State,
) -> Result<Self> {
let inner = FcInner::restore(hypervisor_args, hypervisor_state).await?;
let (exit_notify, exit_waiter) = mpsc::channel(1);
let inner = FcInner::restore(exit_notify, hypervisor_state).await?;
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
exit_waiter: Mutex::new((exit_waiter, 0)),
})
}
}

View File

@@ -43,13 +43,10 @@ pub struct QemuInner {
netns: Option<String>,
exit_notify: Option<mpsc::Sender<()>>,
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
}
impl QemuInner {
pub fn new() -> QemuInner {
let (exit_notify, exit_waiter) = mpsc::channel(1);
pub fn new(exit_notify: mpsc::Sender<()>) -> QemuInner {
QemuInner {
id: "".to_string(),
qemu_process: Mutex::new(None),
@@ -59,7 +56,6 @@ impl QemuInner {
netns: None,
exit_notify: Some(exit_notify),
exit_waiter: Mutex::new((exit_waiter, 0)),
}
}
@@ -202,22 +198,14 @@ impl QemuInner {
}
pub(crate) async fn wait_vm(&self) -> Result<i32> {
info!(sl!(), "Wait QEMU VM");
let mut waiter = self.exit_waiter.lock().await;
//wait until the qemu process exited.
waiter.0.recv().await;
let mut qemu_process = self.qemu_process.lock().await;
if let Some(mut qemu_process) = qemu_process.take() {
if let Ok(status) = qemu_process.wait().await {
waiter.1 = status.code().unwrap_or(0);
}
let status = qemu_process.wait().await?;
Ok(status.code().unwrap_or(0))
} else {
Err(anyhow!("the process has been reaped"))
}
Ok(waiter.1)
}
pub(crate) fn pause_vm(&self) -> Result<()> {
@@ -589,7 +577,7 @@ impl QemuInner {
#[async_trait]
impl Persist for QemuInner {
type State = HypervisorState;
type ConstructorArgs = ();
type ConstructorArgs = mpsc::Sender<()>;
/// Save a state of hypervisor
async fn save(&self) -> Result<Self::State> {
@@ -602,12 +590,7 @@ impl Persist for QemuInner {
}
/// Restore hypervisor
async fn restore(
_hypervisor_args: Self::ConstructorArgs,
hypervisor_state: Self::State,
) -> Result<Self> {
let (exit_notify, exit_waiter) = mpsc::channel(1);
async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result<Self> {
Ok(QemuInner {
id: hypervisor_state.id,
qemu_process: Mutex::new(None),
@@ -617,7 +600,6 @@ impl Persist for QemuInner {
netns: None,
exit_notify: Some(exit_notify),
exit_waiter: Mutex::new((exit_waiter, 0)),
})
}
}

View File

@@ -20,10 +20,12 @@ use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, Mutex};
#[derive(Debug)]
pub struct Qemu {
inner: Arc<RwLock<QemuInner>>,
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
}
impl Default for Qemu {
@@ -34,8 +36,11 @@ impl Default for Qemu {
impl Qemu {
pub fn new() -> Self {
let (exit_notify, exit_waiter) = mpsc::channel(1);
Self {
inner: Arc::new(RwLock::new(QemuInner::new())),
inner: Arc::new(RwLock::new(QemuInner::new(exit_notify))),
exit_waiter: Mutex::new((exit_waiter, 0)),
}
}
@@ -63,8 +68,19 @@ impl Hypervisor for Qemu {
}
async fn wait_vm(&self) -> Result<i32> {
info!(sl!(), "Wait QEMU VM");
let mut waiter = self.exit_waiter.lock().await;
//wait until the qemu process exited.
waiter.0.recv().await;
let inner = self.inner.read().await;
inner.wait_vm().await
if let Ok(exit_code) = inner.wait_vm().await {
waiter.1 = exit_code;
}
Ok(waiter.1)
}
async fn pause_vm(&self) -> Result<()> {
@@ -204,12 +220,15 @@ impl Persist for Qemu {
/// Restore a component from a specified state.
async fn restore(
hypervisor_args: Self::ConstructorArgs,
_hypervisor_args: Self::ConstructorArgs,
hypervisor_state: Self::State,
) -> Result<Self> {
let inner = QemuInner::restore(hypervisor_args, hypervisor_state).await?;
let (exit_notify, exit_waiter) = mpsc::channel(1);
let inner = QemuInner::restore(exit_notify, hypervisor_state).await?;
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
exit_waiter: Mutex::new((exit_waiter, 0)),
})
}
}