mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-23 14:08:31 +00:00
Merge pull request #10148 from lifupan/main_sandboxapi
runtime-rs: Add the wait_vm support for hypervisors
This commit is contained in:
commit
365df81d5e
@ -15,9 +15,10 @@ use kata_types::config::hypervisor::HYPERVISOR_NAME_CH;
|
||||
use persist::sandbox_persist::Persist;
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::net::UnixStream;
|
||||
use tokio::process::Child;
|
||||
use tokio::sync::watch::{channel, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::{process::Child, sync::mpsc};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CloudHypervisorInner {
|
||||
@ -76,6 +77,9 @@ pub struct CloudHypervisorInner {
|
||||
|
||||
/// Size of memory block of guest OS in MB (currently unused)
|
||||
pub(crate) _guest_memory_block_size_mb: u32,
|
||||
|
||||
pub(crate) exit_notify: Option<mpsc::Sender<i32>>,
|
||||
pub(crate) exit_waiter: Mutex<(mpsc::Receiver<i32>, i32)>,
|
||||
}
|
||||
|
||||
const CH_DEFAULT_TIMEOUT_SECS: u32 = 10;
|
||||
@ -91,6 +95,7 @@ impl CloudHypervisorInner {
|
||||
);
|
||||
|
||||
let (tx, rx) = channel(true);
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Self {
|
||||
api_socket: None,
|
||||
@ -116,6 +121,9 @@ impl CloudHypervisorInner {
|
||||
guest_protection_to_use: GuestProtection::NoProtection,
|
||||
ch_features: None,
|
||||
_guest_memory_block_size_mb: 0,
|
||||
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,6 +170,7 @@ impl Persist for CloudHypervisorInner {
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let (tx, rx) = channel(true);
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
let mut ch = Self {
|
||||
config: Some(hypervisor_state.config),
|
||||
@ -180,6 +189,8 @@ impl Persist for CloudHypervisorInner {
|
||||
timeout_secs: CH_DEFAULT_TIMEOUT_SECS as i32,
|
||||
jailer_root: String::default(),
|
||||
ch_features: None,
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
|
||||
..Default::default()
|
||||
};
|
||||
|
@ -37,13 +37,13 @@ use std::os::unix::net::UnixStream;
|
||||
use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::sync::watch::Receiver;
|
||||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio::{io::AsyncBufReadExt, sync::mpsc};
|
||||
|
||||
const CH_NAME: &str = "cloud-hypervisor";
|
||||
|
||||
@ -410,7 +410,13 @@ impl CloudHypervisorInner {
|
||||
.map_err(|e| anyhow!(e))?
|
||||
.clone();
|
||||
|
||||
let ch_outputlogger_task = tokio::spawn(cloud_hypervisor_log_output(child, shutdown));
|
||||
let exit_notify: mpsc::Sender<i32> = self
|
||||
.exit_notify
|
||||
.take()
|
||||
.ok_or_else(|| anyhow!("no exit notify"))?;
|
||||
|
||||
let ch_outputlogger_task =
|
||||
tokio::spawn(cloud_hypervisor_log_output(child, shutdown, exit_notify));
|
||||
|
||||
let tasks = vec![ch_outputlogger_task];
|
||||
|
||||
@ -651,6 +657,16 @@ impl CloudHypervisorInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_vm(&self) -> Result<i32> {
|
||||
debug!(sl!(), "Waiting CH vmm");
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
if let Some(exitcode) = waiter.0.recv().await {
|
||||
waiter.1 = exitcode;
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
pub(crate) fn pause_vm(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
@ -778,7 +794,11 @@ impl CloudHypervisorInner {
|
||||
// Log all output from the CH process until a shutdown signal is received.
|
||||
// When that happens, stop logging and wait for the child process to finish
|
||||
// before returning.
|
||||
async fn cloud_hypervisor_log_output(mut child: Child, mut shutdown: Receiver<bool>) -> Result<()> {
|
||||
async fn cloud_hypervisor_log_output(
|
||||
mut child: Child,
|
||||
mut shutdown: Receiver<bool>,
|
||||
exit_notify: mpsc::Sender<i32>,
|
||||
) -> Result<()> {
|
||||
let stdout = child
|
||||
.stdout
|
||||
.as_mut()
|
||||
@ -833,7 +853,10 @@ async fn cloud_hypervisor_log_output(mut child: Child, mut shutdown: Receiver<bo
|
||||
}
|
||||
|
||||
// Note that this kills _and_ waits for the process!
|
||||
child.kill().await?;
|
||||
let _ = child.kill().await;
|
||||
if let Ok(status) = child.wait().await {
|
||||
let _ = exit_notify.try_send(status.code().unwrap_or(0));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -64,6 +64,11 @@ impl Hypervisor for CloudHypervisor {
|
||||
inner.stop_vm()
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.wait_vm().await
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
let inner = self.inner.write().await;
|
||||
inner.pause_vm()
|
||||
|
@ -29,6 +29,7 @@ use nix::mount::MsFlags;
|
||||
use persist::sandbox_persist::Persist;
|
||||
use std::cmp::Ordering;
|
||||
use std::{collections::HashSet, fs::create_dir_all};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
const DRAGONBALL_KERNEL: &str = "vmlinux";
|
||||
const DRAGONBALL_INITRD: &str = "initrd";
|
||||
@ -88,7 +89,7 @@ pub struct DragonballInner {
|
||||
}
|
||||
|
||||
impl DragonballInner {
|
||||
pub fn new() -> DragonballInner {
|
||||
pub fn new(exit_notify: mpsc::Sender<i32>) -> DragonballInner {
|
||||
let mut capabilities = Capabilities::new();
|
||||
capabilities.set(
|
||||
CapabilityBits::BlockDeviceSupport
|
||||
@ -106,7 +107,7 @@ impl DragonballInner {
|
||||
pending_devices: vec![],
|
||||
state: VmmState::NotReady,
|
||||
jailed: false,
|
||||
vmm_instance: VmmInstance::new(""),
|
||||
vmm_instance: VmmInstance::new("", exit_notify),
|
||||
run_dir: "".to_string(),
|
||||
cached_block_devices: Default::default(),
|
||||
capabilities,
|
||||
@ -498,7 +499,7 @@ impl DragonballInner {
|
||||
#[async_trait]
|
||||
impl Persist for DragonballInner {
|
||||
type State = HypervisorState;
|
||||
type ConstructorArgs = ();
|
||||
type ConstructorArgs = mpsc::Sender<i32>;
|
||||
|
||||
/// Save a state of hypervisor
|
||||
async fn save(&self) -> Result<Self::State> {
|
||||
@ -519,7 +520,7 @@ impl Persist for DragonballInner {
|
||||
|
||||
/// Restore hypervisor
|
||||
async fn restore(
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
Ok(DragonballInner {
|
||||
@ -530,7 +531,7 @@ impl Persist for DragonballInner {
|
||||
netns: hypervisor_state.netns,
|
||||
config: hypervisor_state.config,
|
||||
state: VmmState::NotReady,
|
||||
vmm_instance: VmmInstance::new(""),
|
||||
vmm_instance: VmmInstance::new("", hypervisor_args),
|
||||
run_dir: hypervisor_state.run_dir,
|
||||
pending_devices: vec![],
|
||||
cached_block_devices: hypervisor_state.cached_block_devices,
|
||||
|
@ -7,6 +7,13 @@
|
||||
use std::convert::TryFrom;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::{build_dragonball_network_config, DragonballInner};
|
||||
use crate::device::pci_path::PciPath;
|
||||
use crate::VhostUserConfig;
|
||||
use crate::{
|
||||
device::DeviceType, HybridVsockConfig, NetworkConfig, ShareFsConfig, ShareFsMountConfig,
|
||||
ShareFsMountOperation, ShareFsMountType, VfioDevice, VmmState, JAILER_ROOT,
|
||||
};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use dbs_utils::net::MacAddr;
|
||||
use dragonball::api::v1::VhostUserConfig as DragonballVhostUserConfig;
|
||||
@ -19,14 +26,6 @@ use dragonball::device_manager::{
|
||||
vfio_dev_mgr::{HostDeviceConfig, VfioPciDeviceConfig},
|
||||
};
|
||||
|
||||
use super::{build_dragonball_network_config, DragonballInner};
|
||||
use crate::device::pci_path::PciPath;
|
||||
use crate::VhostUserConfig;
|
||||
use crate::{
|
||||
device::DeviceType, HybridVsockConfig, NetworkConfig, ShareFsConfig, ShareFsMountConfig,
|
||||
ShareFsMountOperation, ShareFsMountType, VfioDevice, VmmState, JAILER_ROOT,
|
||||
};
|
||||
|
||||
const MB_TO_B: u32 = 1024 * 1024;
|
||||
const DEFAULT_VIRTIO_FS_NUM_QUEUES: i32 = 1;
|
||||
const DEFAULT_VIRTIO_FS_QUEUE_SIZE: i32 = 1024;
|
||||
@ -431,12 +430,14 @@ impl DragonballInner {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use dragonball::api::v1::FsDeviceConfigInfo;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::dragonball::DragonballInner;
|
||||
|
||||
#[test]
|
||||
fn test_parse_inline_virtiofs_args() {
|
||||
let mut dragonball = DragonballInner::new();
|
||||
let (tx, _) = mpsc::channel(1);
|
||||
let mut dragonball = DragonballInner::new(tx);
|
||||
let mut fs_cfg = FsDeviceConfigInfo::default();
|
||||
|
||||
// no_open and writeback_cache is the default, so test open and no_writeback_cache. "-d"
|
||||
|
@ -23,13 +23,14 @@ use dragonball::api::v1::{
|
||||
};
|
||||
use kata_types::capabilities::{Capabilities, CapabilityBits};
|
||||
use kata_types::config::hypervisor::Hypervisor as HypervisorConfig;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{mpsc, Mutex, RwLock};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{DeviceType, Hypervisor, MemoryConfig, NetworkConfig, VcpuThreadIds};
|
||||
|
||||
pub struct Dragonball {
|
||||
inner: Arc<RwLock<DragonballInner>>,
|
||||
exit_waiter: Mutex<(mpsc::Receiver<i32>, i32)>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Dragonball {
|
||||
@ -46,8 +47,11 @@ impl Default for Dragonball {
|
||||
|
||||
impl Dragonball {
|
||||
pub fn new() -> Self {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(DragonballInner::new())),
|
||||
inner: Arc::new(RwLock::new(DragonballInner::new(exit_notify))),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,6 +85,15 @@ impl Hypervisor for Dragonball {
|
||||
inner.stop_vm()
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
if let Some(exit_code) = waiter.0.recv().await {
|
||||
waiter.1 = exit_code;
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.pause_vm()
|
||||
@ -218,12 +231,15 @@ impl Persist for Dragonball {
|
||||
}
|
||||
/// Restore a component from a specified state.
|
||||
async fn restore(
|
||||
hypervisor_args: Self::ConstructorArgs,
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let inner = DragonballInner::restore(hypervisor_args, hypervisor_state).await?;
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
let inner = DragonballInner::restore(exit_notify, hypervisor_state).await?;
|
||||
Ok(Self {
|
||||
inner: Arc::new(RwLock::new(inner)),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ use dragonball::{
|
||||
};
|
||||
use nix::sched::{setns, CloneFlags};
|
||||
use seccompiler::BpfProgram;
|
||||
use tokio::sync::mpsc;
|
||||
use vmm_sys_util::eventfd::EventFd;
|
||||
|
||||
use crate::ShareFsMountOperation;
|
||||
@ -49,10 +50,11 @@ pub struct VmmInstance {
|
||||
to_vmm_fd: EventFd,
|
||||
seccomp: BpfProgram,
|
||||
vmm_thread: Option<thread::JoinHandle<Result<i32>>>,
|
||||
exit_notify: Option<mpsc::Sender<i32>>,
|
||||
}
|
||||
|
||||
impl VmmInstance {
|
||||
pub fn new(id: &str) -> Self {
|
||||
pub fn new(id: &str, exit_notify: mpsc::Sender<i32>) -> Self {
|
||||
let vmm_shared_info = Arc::new(RwLock::new(InstanceInfo::new(
|
||||
String::from(id),
|
||||
DRAGONBALL_VERSION.to_string(),
|
||||
@ -68,6 +70,7 @@ impl VmmInstance {
|
||||
to_vmm_fd,
|
||||
seccomp: vec![],
|
||||
vmm_thread: None,
|
||||
exit_notify: Some(exit_notify),
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,6 +125,7 @@ impl VmmInstance {
|
||||
)
|
||||
.expect("Failed to start vmm");
|
||||
let vmm_shared_info = self.get_shared_info();
|
||||
let exit_notify = self.exit_notify.take().unwrap();
|
||||
|
||||
self.vmm_thread = Some(
|
||||
thread::Builder::new()
|
||||
@ -141,6 +145,12 @@ impl VmmInstance {
|
||||
}
|
||||
let exit_code =
|
||||
Vmm::run_vmm_event_loop(Arc::new(Mutex::new(vmm)), vmm_service);
|
||||
exit_notify
|
||||
.try_send(exit_code)
|
||||
.map_err(|e| {
|
||||
error!(sl!(), "vmm-master thread fail to send. {:?}", e);
|
||||
})
|
||||
.ok();
|
||||
debug!(sl!(), "run vmm thread exited: {}", exit_code);
|
||||
Ok(exit_code)
|
||||
}()
|
||||
|
@ -8,7 +8,7 @@ use crate::HypervisorState;
|
||||
use crate::MemoryConfig;
|
||||
use crate::HYPERVISOR_FIRECRACKER;
|
||||
use crate::{device::DeviceType, VmmState};
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use hyper::Client;
|
||||
use hyperlocal::{UnixClientExt, UnixConnector};
|
||||
@ -19,7 +19,12 @@ use kata_types::{
|
||||
use nix::sched::{setns, CloneFlags};
|
||||
use persist::sandbox_persist::Persist;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use tokio::process::Command;
|
||||
use std::process::Stdio;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::{Child, ChildStderr, Command};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
unsafe impl Send for FcInner {}
|
||||
unsafe impl Sync for FcInner {}
|
||||
@ -39,12 +44,17 @@ pub struct FcInner {
|
||||
pub(crate) run_dir: String,
|
||||
pub(crate) pending_devices: Vec<DeviceType>,
|
||||
pub(crate) capabilities: Capabilities,
|
||||
pub(crate) fc_process: Mutex<Option<Child>>,
|
||||
pub(crate) exit_notify: Option<mpsc::Sender<()>>,
|
||||
pub(crate) exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
|
||||
}
|
||||
|
||||
impl FcInner {
|
||||
pub fn new() -> FcInner {
|
||||
let mut capabilities = Capabilities::new();
|
||||
capabilities.set(CapabilityBits::BlockDeviceSupport);
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
FcInner {
|
||||
id: String::default(),
|
||||
asock_path: String::default(),
|
||||
@ -59,6 +69,9 @@ impl FcInner {
|
||||
run_dir: String::default(),
|
||||
pending_devices: vec![],
|
||||
capabilities,
|
||||
fc_process: Mutex::new(None),
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,7 +121,15 @@ impl FcInner {
|
||||
});
|
||||
}
|
||||
|
||||
let mut child = cmd.spawn()?;
|
||||
let mut child = cmd.stderr(Stdio::piped()).spawn()?;
|
||||
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
let exit_notify: mpsc::Sender<()> = self
|
||||
.exit_notify
|
||||
.take()
|
||||
.ok_or_else(|| anyhow!("no exit notify"))?;
|
||||
|
||||
tokio::spawn(log_fc_stderr(stderr, exit_notify));
|
||||
|
||||
match child.id() {
|
||||
Some(id) => {
|
||||
@ -122,8 +143,12 @@ impl FcInner {
|
||||
None => {
|
||||
let exit_status = child.wait().await?;
|
||||
error!(sl(), "Process exited, status: {:?}", exit_status);
|
||||
return Err(anyhow!("fc vmm start failed with: {:?}", exit_status));
|
||||
}
|
||||
};
|
||||
|
||||
self.fc_process = Mutex::new(Some(child));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -167,6 +192,27 @@ impl FcInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn log_fc_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Result<()> {
|
||||
info!(sl!(), "starting reading fc stderr");
|
||||
|
||||
let stderr_reader = BufReader::new(stderr);
|
||||
let mut stderr_lines = stderr_reader.lines();
|
||||
|
||||
while let Some(buffer) = stderr_lines
|
||||
.next_line()
|
||||
.await
|
||||
.context("next_line() failed on fc stderr")?
|
||||
{
|
||||
info!(sl!(), "fc stderr: {:?}", buffer);
|
||||
}
|
||||
|
||||
// Notfiy the waiter the process exit.
|
||||
let _ = exit_notify.try_send(());
|
||||
|
||||
info!(sl!(), "finished reading fc stderr");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Persist for FcInner {
|
||||
type State = HypervisorState;
|
||||
@ -189,6 +235,8 @@ impl Persist for FcInner {
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Ok(FcInner {
|
||||
id: hypervisor_state.id,
|
||||
asock_path: String::default(),
|
||||
@ -203,6 +251,9 @@ impl Persist for FcInner {
|
||||
pending_devices: vec![],
|
||||
run_dir: hypervisor_state.run_dir,
|
||||
capabilities: Capabilities::new(),
|
||||
fc_process: Mutex::new(None),
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -78,17 +78,47 @@ impl FcInner {
|
||||
debug!(sl(), "Stopping sandbox");
|
||||
if self.state != VmmState::VmRunning {
|
||||
debug!(sl(), "VM not running!");
|
||||
} else if let Some(pid_to_kill) = &self.pid {
|
||||
let pid = ::nix::unistd::Pid::from_raw(*pid_to_kill as i32);
|
||||
if let Err(err) = ::nix::sys::signal::kill(pid, nix::sys::signal::SIGKILL) {
|
||||
if err != ::nix::Error::ESRCH {
|
||||
debug!(sl(), "Failed to kill VMM with pid {} {:?}", pid, err);
|
||||
} else {
|
||||
let mut fc_process = self.fc_process.lock().await;
|
||||
if let Some(fc_process) = fc_process.as_mut() {
|
||||
if fc_process.id().is_some() {
|
||||
info!(sl!(), "FcInner::stop_vm(): kill()'ing fc");
|
||||
return fc_process.kill().await.map_err(anyhow::Error::from);
|
||||
} else {
|
||||
info!(
|
||||
sl!(),
|
||||
"FcInner::stop_vm(): fc process isn't running (likely stopped already)"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
sl!(),
|
||||
"FcInner::stop_vm(): fc process isn't running (likely stopped already)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_vm(&self) -> Result<i32> {
|
||||
debug!(sl(), "Wait fc sandbox");
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
|
||||
//wait until the fc process exited.
|
||||
waiter.0.recv().await;
|
||||
|
||||
let mut fc_process = self.fc_process.lock().await;
|
||||
|
||||
if let Some(mut fc_process) = fc_process.take() {
|
||||
if let Ok(status) = fc_process.wait().await {
|
||||
waiter.1 = status.code().unwrap_or(0);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
pub(crate) fn pause_vm(&self) -> Result<()> {
|
||||
warn!(sl(), "Pause VM: Not implemented");
|
||||
Ok(())
|
||||
|
@ -67,6 +67,11 @@ impl Hypervisor for Firecracker {
|
||||
inner.stop_vm().await
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.wait_vm().await
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.pause_vm()
|
||||
|
@ -96,6 +96,7 @@ pub trait Hypervisor: std::fmt::Debug + Send + Sync {
|
||||
async fn prepare_vm(&self, id: &str, netns: Option<String>) -> Result<()>;
|
||||
async fn start_vm(&self, timeout: i32) -> Result<()>;
|
||||
async fn stop_vm(&self) -> Result<()>;
|
||||
async fn wait_vm(&self) -> Result<i32>;
|
||||
async fn pause_vm(&self) -> Result<()>;
|
||||
async fn save_vm(&self) -> Result<()>;
|
||||
async fn resume_vm(&self) -> Result<()>;
|
||||
|
@ -22,6 +22,7 @@ use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
process::{Child, ChildStderr, Command},
|
||||
@ -34,23 +35,31 @@ pub struct QemuInner {
|
||||
/// sandbox id
|
||||
id: String,
|
||||
|
||||
qemu_process: Option<Child>,
|
||||
qemu_process: Mutex<Option<Child>>,
|
||||
qmp: Option<Qmp>,
|
||||
|
||||
config: HypervisorConfig,
|
||||
devices: Vec<DeviceType>,
|
||||
netns: Option<String>,
|
||||
|
||||
exit_notify: Option<mpsc::Sender<()>>,
|
||||
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
|
||||
}
|
||||
|
||||
impl QemuInner {
|
||||
pub fn new() -> QemuInner {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
QemuInner {
|
||||
id: "".to_string(),
|
||||
qemu_process: None,
|
||||
qemu_process: Mutex::new(None),
|
||||
qmp: None,
|
||||
config: Default::default(),
|
||||
devices: Vec::new(),
|
||||
netns: None,
|
||||
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,12 +156,18 @@ impl QemuInner {
|
||||
});
|
||||
}
|
||||
|
||||
self.qemu_process = Some(command.stderr(Stdio::piped()).spawn()?);
|
||||
let mut qemu_process = command.stderr(Stdio::piped()).spawn()?;
|
||||
let stderr = qemu_process.stderr.take().unwrap();
|
||||
self.qemu_process = Mutex::new(Some(qemu_process));
|
||||
|
||||
info!(sl!(), "qemu process started");
|
||||
|
||||
if let Some(ref mut qemu_process) = &mut self.qemu_process {
|
||||
tokio::spawn(log_qemu_stderr(qemu_process.stderr.take().unwrap()));
|
||||
}
|
||||
let exit_notify: mpsc::Sender<()> = self
|
||||
.exit_notify
|
||||
.take()
|
||||
.ok_or_else(|| anyhow!("no exit notify"))?;
|
||||
|
||||
tokio::spawn(log_qemu_stderr(stderr, exit_notify));
|
||||
|
||||
match Qmp::new(QMP_SOCKET_FILE) {
|
||||
Ok(qmp) => self.qmp = Some(qmp),
|
||||
@ -167,7 +182,9 @@ impl QemuInner {
|
||||
|
||||
pub(crate) async fn stop_vm(&mut self) -> Result<()> {
|
||||
info!(sl!(), "Stopping QEMU VM");
|
||||
if let Some(ref mut qemu_process) = &mut self.qemu_process {
|
||||
|
||||
let mut qemu_process = self.qemu_process.lock().await;
|
||||
if let Some(qemu_process) = qemu_process.as_mut() {
|
||||
let is_qemu_running = qemu_process.id().is_some();
|
||||
if is_qemu_running {
|
||||
info!(sl!(), "QemuInner::stop_vm(): kill()'ing qemu");
|
||||
@ -184,6 +201,25 @@ impl QemuInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_vm(&self) -> Result<i32> {
|
||||
info!(sl!(), "Wait QEMU VM");
|
||||
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
|
||||
//wait until the qemu process exited.
|
||||
waiter.0.recv().await;
|
||||
|
||||
let mut qemu_process = self.qemu_process.lock().await;
|
||||
|
||||
if let Some(mut qemu_process) = qemu_process.take() {
|
||||
if let Ok(status) = qemu_process.wait().await {
|
||||
waiter.1 = status.code().unwrap_or(0);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
pub(crate) fn pause_vm(&self) -> Result<()> {
|
||||
info!(sl!(), "Pausing QEMU VM");
|
||||
todo!()
|
||||
@ -224,7 +260,7 @@ impl QemuInner {
|
||||
|
||||
pub(crate) async fn get_vmm_master_tid(&self) -> Result<u32> {
|
||||
info!(sl!(), "QemuInner::get_vmm_master_tid()");
|
||||
if let Some(qemu_process) = &self.qemu_process {
|
||||
if let Some(qemu_process) = self.qemu_process.lock().await.as_ref() {
|
||||
if let Some(qemu_pid) = qemu_process.id() {
|
||||
info!(
|
||||
sl!(),
|
||||
@ -491,7 +527,7 @@ impl QemuInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn log_qemu_stderr(stderr: ChildStderr) -> Result<()> {
|
||||
async fn log_qemu_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Result<()> {
|
||||
info!(sl!(), "starting reading qemu stderr");
|
||||
|
||||
let stderr_reader = BufReader::new(stderr);
|
||||
@ -505,6 +541,9 @@ async fn log_qemu_stderr(stderr: ChildStderr) -> Result<()> {
|
||||
info!(sl!(), "qemu stderr: {:?}", buffer);
|
||||
}
|
||||
|
||||
// Notfiy the waiter the process exit.
|
||||
let _ = exit_notify.try_send(());
|
||||
|
||||
info!(sl!(), "finished reading qemu stderr");
|
||||
Ok(())
|
||||
}
|
||||
@ -567,13 +606,18 @@ impl Persist for QemuInner {
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Ok(QemuInner {
|
||||
id: hypervisor_state.id,
|
||||
qemu_process: None,
|
||||
qemu_process: Mutex::new(None),
|
||||
qmp: None,
|
||||
config: hypervisor_state.config,
|
||||
devices: Vec::new(),
|
||||
netns: None,
|
||||
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -62,6 +62,11 @@ impl Hypervisor for Qemu {
|
||||
inner.stop_vm().await
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.wait_vm().await
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.pause_vm()
|
||||
|
Loading…
Reference in New Issue
Block a user