diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs index 8d77920c63..d99321a74a 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs @@ -16,7 +16,6 @@ use persist::sandbox_persist::Persist; use std::collections::HashMap; use std::os::unix::net::UnixStream; use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::{process::Child, sync::mpsc}; @@ -79,13 +78,12 @@ pub struct CloudHypervisorInner { pub(crate) _guest_memory_block_size_mb: u32, pub(crate) exit_notify: Option>, - pub(crate) exit_waiter: Mutex<(mpsc::Receiver, i32)>, } const CH_DEFAULT_TIMEOUT_SECS: u32 = 10; impl CloudHypervisorInner { - pub fn new() -> Self { + pub fn new(exit_notify: Option>) -> Self { let mut capabilities = Capabilities::new(); capabilities.set( CapabilityBits::BlockDeviceSupport @@ -95,7 +93,6 @@ impl CloudHypervisorInner { ); let (tx, rx) = channel(true); - let (exit_notify, exit_waiter) = mpsc::channel(1); Self { api_socket: None, @@ -122,8 +119,7 @@ impl CloudHypervisorInner { ch_features: None, _guest_memory_block_size_mb: 0, - exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), + exit_notify, } } @@ -138,14 +134,14 @@ impl CloudHypervisorInner { impl Default for CloudHypervisorInner { fn default() -> Self { - Self::new() + Self::new(None) } } #[async_trait] impl Persist for CloudHypervisorInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender; // Return a state object that will be saved by the caller. async fn save(&self) -> Result { @@ -166,11 +162,10 @@ impl Persist for CloudHypervisorInner { // Set the hypervisor state to the specified state async fn restore( - _hypervisor_args: Self::ConstructorArgs, + exit_notify: mpsc::Sender, hypervisor_state: Self::State, ) -> Result { let (tx, rx) = channel(true); - let (exit_notify, exit_waiter) = mpsc::channel(1); let mut ch = Self { config: Some(hypervisor_state.config), @@ -190,7 +185,6 @@ impl Persist for CloudHypervisorInner { jailer_root: String::default(), ch_features: None, exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), ..Default::default() }; @@ -207,7 +201,9 @@ mod tests { #[actix_rt::test] async fn test_save_clh() { - let mut clh = CloudHypervisorInner::new(); + let (exit_notify, _exit_waiter) = mpsc::channel(1); + + let mut clh = CloudHypervisorInner::new(Some(exit_notify.clone())); clh.id = String::from("123456"); clh.netns = Some(String::from("/var/run/netns/testnet")); clh.vm_path = String::from("/opt/kata/bin/cloud-hypervisor"); @@ -229,7 +225,7 @@ mod tests { assert!(!state.jailed); assert_eq!(state.hypervisor_type, HYPERVISOR_NAME_CH.to_string()); - let clh = CloudHypervisorInner::restore((), state.clone()) + let clh = CloudHypervisorInner::restore(exit_notify, state.clone()) .await .unwrap(); assert_eq!(clh.id, state.id); diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs index 255e8dff90..1d9598b111 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs @@ -656,14 +656,9 @@ impl CloudHypervisorInner { Ok(()) } + #[allow(dead_code)] pub(crate) async fn wait_vm(&self) -> Result { - debug!(sl!(), "Waiting CH vmm"); - let mut waiter = self.exit_waiter.lock().await; - if let Some(exitcode) = waiter.0.recv().await { - waiter.1 = exitcode; - } - - Ok(waiter.1) + Ok(0) } pub(crate) fn pause_vm(&self) -> Result<()> { diff --git a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs index 42f8883b60..adac88a15e 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs @@ -12,7 +12,7 @@ use kata_types::capabilities::{Capabilities, CapabilityBits}; use kata_types::config::hypervisor::Hypervisor as HypervisorConfig; use persist::sandbox_persist::Persist; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, Mutex, RwLock}; // Convenience macro to obtain the scope logger #[macro_export] @@ -29,15 +29,19 @@ mod utils; use inner::CloudHypervisorInner; -#[derive(Debug, Default, Clone)] +#[derive(Debug)] pub struct CloudHypervisor { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver, i32)>, } impl CloudHypervisor { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(CloudHypervisorInner::new())), + inner: Arc::new(RwLock::new(CloudHypervisorInner::new(Some(exit_notify)))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -47,6 +51,12 @@ impl CloudHypervisor { } } +impl Default for CloudHypervisor { + fn default() -> Self { + Self::new() + } +} + #[async_trait] impl Hypervisor for CloudHypervisor { async fn prepare_vm(&self, id: &str, netns: Option) -> Result<()> { @@ -65,8 +75,13 @@ impl Hypervisor for CloudHypervisor { } async fn wait_vm(&self) -> Result { - let inner = self.inner.read().await; - inner.wait_vm().await + debug!(sl!(), "Waiting CH vmm"); + let mut waiter = self.exit_waiter.lock().await; + if let Some(exitcode) = waiter.0.recv().await { + waiter.1 = exitcode; + } + + Ok(waiter.1) } async fn pause_vm(&self) -> Result<()> { @@ -204,12 +219,15 @@ impl Persist for CloudHypervisor { } async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = CloudHypervisorInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + + let inner = CloudHypervisorInner::restore(exit_notify, hypervisor_state).await?; Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs index be9b7b786a..45859b5b10 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/inner.rs @@ -46,14 +46,12 @@ pub struct FcInner { pub(crate) capabilities: Capabilities, pub(crate) fc_process: Mutex>, pub(crate) exit_notify: Option>, - pub(crate) exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl FcInner { - pub fn new() -> FcInner { + pub fn new(exit_notify: mpsc::Sender<()>) -> FcInner { let mut capabilities = Capabilities::new(); capabilities.set(CapabilityBits::BlockDeviceSupport); - let (exit_notify, exit_waiter) = mpsc::channel(1); FcInner { id: String::default(), @@ -71,7 +69,6 @@ impl FcInner { capabilities, fc_process: Mutex::new(None), exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -124,11 +121,10 @@ impl FcInner { let mut child = cmd.stderr(Stdio::piped()).spawn()?; let stderr = child.stderr.take().unwrap(); - let exit_notify: mpsc::Sender<()> = self + let exit_notify = self .exit_notify .take() .ok_or_else(|| anyhow!("no exit notify"))?; - tokio::spawn(log_fc_stderr(stderr, exit_notify)); match child.id() { @@ -216,7 +212,7 @@ async fn log_fc_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Re #[async_trait] impl Persist for FcInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender<()>; async fn save(&self) -> Result { Ok(HypervisorState { @@ -231,12 +227,7 @@ impl Persist for FcInner { ..Default::default() }) } - async fn restore( - _hypervisor_args: Self::ConstructorArgs, - hypervisor_state: Self::State, - ) -> Result { - let (exit_notify, exit_waiter) = mpsc::channel(1); - + async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result { Ok(FcInner { id: hypervisor_state.id, asock_path: String::default(), @@ -253,7 +244,6 @@ impl Persist for FcInner { capabilities: Capabilities::new(), fc_process: Mutex::new(None), exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs index bb3bbbff6c..2178e7b2f7 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/inner_hypervisor.rs @@ -102,21 +102,14 @@ impl FcInner { } pub(crate) async fn wait_vm(&self) -> Result { - debug!(sl(), "Wait fc sandbox"); - let mut waiter = self.exit_waiter.lock().await; - - //wait until the fc process exited. - waiter.0.recv().await; - let mut fc_process = self.fc_process.lock().await; if let Some(mut fc_process) = fc_process.take() { - if let Ok(status) = fc_process.wait().await { - waiter.1 = status.code().unwrap_or(0); - } + let status = fc_process.wait().await?; + Ok(status.code().unwrap_or(0)) + } else { + Err(anyhow!("the process has been reaped")) } - - Ok(waiter.1) } pub(crate) fn pause_vm(&self) -> Result<()> { diff --git a/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs b/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs index adf77964c2..3b65eadd88 100644 --- a/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/firecracker/mod.rs @@ -19,11 +19,14 @@ use kata_types::capabilities::Capabilities; use kata_types::capabilities::CapabilityBits; use persist::sandbox_persist::Persist; use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::Mutex; use tokio::sync::RwLock; #[derive(Debug)] pub struct Firecracker { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } // Convenience function to set the scope. @@ -39,8 +42,11 @@ impl Default for Firecracker { impl Firecracker { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(FcInner::new())), + inner: Arc::new(RwLock::new(FcInner::new(exit_notify))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -68,8 +74,18 @@ impl Hypervisor for Firecracker { } async fn wait_vm(&self) -> Result { + debug!(sl(), "Wait fc sandbox"); + let mut waiter = self.exit_waiter.lock().await; + + //wait until the fc process exited. + waiter.0.recv().await; + let inner = self.inner.read().await; - inner.wait_vm().await + if let Ok(exit_code) = inner.wait_vm().await { + waiter.1 = exit_code; + } + + Ok(waiter.1) } async fn pause_vm(&self) -> Result<()> { @@ -209,12 +225,15 @@ impl Persist for Firecracker { } /// Restore a component from a specified state. async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = FcInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + let inner = FcInner::restore(exit_notify, hypervisor_state).await?; + Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs index fde475ca70..e292662a4b 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs @@ -43,13 +43,10 @@ pub struct QemuInner { netns: Option, exit_notify: Option>, - exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl QemuInner { - pub fn new() -> QemuInner { - let (exit_notify, exit_waiter) = mpsc::channel(1); - + pub fn new(exit_notify: mpsc::Sender<()>) -> QemuInner { QemuInner { id: "".to_string(), qemu_process: Mutex::new(None), @@ -59,7 +56,6 @@ impl QemuInner { netns: None, exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -202,22 +198,14 @@ impl QemuInner { } pub(crate) async fn wait_vm(&self) -> Result { - info!(sl!(), "Wait QEMU VM"); - - let mut waiter = self.exit_waiter.lock().await; - - //wait until the qemu process exited. - waiter.0.recv().await; - let mut qemu_process = self.qemu_process.lock().await; if let Some(mut qemu_process) = qemu_process.take() { - if let Ok(status) = qemu_process.wait().await { - waiter.1 = status.code().unwrap_or(0); - } + let status = qemu_process.wait().await?; + Ok(status.code().unwrap_or(0)) + } else { + Err(anyhow!("the process has been reaped")) } - - Ok(waiter.1) } pub(crate) fn pause_vm(&self) -> Result<()> { @@ -589,7 +577,7 @@ impl QemuInner { #[async_trait] impl Persist for QemuInner { type State = HypervisorState; - type ConstructorArgs = (); + type ConstructorArgs = mpsc::Sender<()>; /// Save a state of hypervisor async fn save(&self) -> Result { @@ -602,12 +590,7 @@ impl Persist for QemuInner { } /// Restore hypervisor - async fn restore( - _hypervisor_args: Self::ConstructorArgs, - hypervisor_state: Self::State, - ) -> Result { - let (exit_notify, exit_waiter) = mpsc::channel(1); - + async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result { Ok(QemuInner { id: hypervisor_state.id, qemu_process: Mutex::new(None), @@ -617,7 +600,6 @@ impl Persist for QemuInner { netns: None, exit_notify: Some(exit_notify), - exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs index 32497c3584..6a6c923cab 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs @@ -20,10 +20,12 @@ use async_trait::async_trait; use std::sync::Arc; use tokio::sync::RwLock; +use tokio::sync::{mpsc, Mutex}; #[derive(Debug)] pub struct Qemu { inner: Arc>, + exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>, } impl Default for Qemu { @@ -34,8 +36,11 @@ impl Default for Qemu { impl Qemu { pub fn new() -> Self { + let (exit_notify, exit_waiter) = mpsc::channel(1); + Self { - inner: Arc::new(RwLock::new(QemuInner::new())), + inner: Arc::new(RwLock::new(QemuInner::new(exit_notify))), + exit_waiter: Mutex::new((exit_waiter, 0)), } } @@ -63,8 +68,19 @@ impl Hypervisor for Qemu { } async fn wait_vm(&self) -> Result { + info!(sl!(), "Wait QEMU VM"); + + let mut waiter = self.exit_waiter.lock().await; + + //wait until the qemu process exited. + waiter.0.recv().await; + let inner = self.inner.read().await; - inner.wait_vm().await + if let Ok(exit_code) = inner.wait_vm().await { + waiter.1 = exit_code; + } + + Ok(waiter.1) } async fn pause_vm(&self) -> Result<()> { @@ -204,12 +220,15 @@ impl Persist for Qemu { /// Restore a component from a specified state. async fn restore( - hypervisor_args: Self::ConstructorArgs, + _hypervisor_args: Self::ConstructorArgs, hypervisor_state: Self::State, ) -> Result { - let inner = QemuInner::restore(hypervisor_args, hypervisor_state).await?; + let (exit_notify, exit_waiter) = mpsc::channel(1); + + let inner = QemuInner::restore(exit_notify, hypervisor_state).await?; Ok(Self { inner: Arc::new(RwLock::new(inner)), + exit_waiter: Mutex::new((exit_waiter, 0)), }) } } diff --git a/src/runtime-rs/crates/runtimes/common/src/error.rs b/src/runtime-rs/crates/runtimes/common/src/error.rs index 2ec03c4c6c..a64ff64773 100644 --- a/src/runtime-rs/crates/runtimes/common/src/error.rs +++ b/src/runtime-rs/crates/runtimes/common/src/error.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::types::{ContainerProcess, Response}; +use crate::types::{ContainerProcess, TaskResponse}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -13,5 +13,5 @@ pub enum Error { #[error("failed to find process {0}")] ProcessNotFound(ContainerProcess), #[error("unexpected response {0} to shim {1}")] - UnexpectedResponse(Response, String), + UnexpectedResponse(TaskResponse, String), } diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index 0e6f80a4f6..4d339d660a 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -16,10 +16,10 @@ use kata_sys_util::validate; use kata_types::mount::Mount; use strum::Display; -/// Request: request from shim -/// Request and Response messages need to be paired +/// TaskRequest: TaskRequest from shim +/// TaskRequest and TaskResponse messages need to be paired #[derive(Debug, Clone, Display)] -pub enum Request { +pub enum TaskRequest { CreateContainer(ContainerConfig), CloseProcessIO(ContainerProcess), DeleteProcess(ContainerProcess), @@ -38,10 +38,10 @@ pub enum Request { ConnectContainer(ContainerID), } -/// Response: response to shim -/// Request and Response messages need to be paired +/// TaskResponse: TaskResponse to shim +/// TaskRequest and TaskResponse messages need to be paired #[derive(Debug, Clone, Display)] -pub enum Response { +pub enum TaskResponse { CreateContainer(PID), CloseProcessIO, DeleteProcess(ProcessStateInfo), diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs index 29a4a676ce..a35ea96cf3 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs @@ -5,8 +5,8 @@ // use super::{ - ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request, - ResizePTYRequest, ShutdownRequest, UpdateRequest, + ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, + ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest, }; use anyhow::{Context, Result}; use containerd_shim_protos::api; @@ -37,7 +37,7 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::CreateTaskRequest) -> Result { let options = if from.has_options() { @@ -45,7 +45,7 @@ impl TryFrom for Request { } else { None }; - Ok(Request::CreateContainer(ContainerConfig { + Ok(TaskRequest::CreateContainer(ContainerConfig { container_id: from.id.clone(), bundle: from.bundle.clone(), rootfs_mounts: from.rootfs.iter().map(trans_from_shim_mount).collect(), @@ -58,29 +58,29 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::CloseIORequest) -> Result { - Ok(Request::CloseProcessIO( + Ok(TaskRequest::CloseProcessIO( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::DeleteRequest) -> Result { - Ok(Request::DeleteProcess( + Ok(TaskRequest::DeleteProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ExecProcessRequest) -> Result { let spec = from.spec(); - Ok(Request::ExecProcess(ExecProcessRequest { + Ok(TaskRequest::ExecProcess(ExecProcessRequest { process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, terminal: from.terminal, stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()), @@ -92,10 +92,10 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::KillRequest) -> Result { - Ok(Request::KillProcess(KillRequest { + Ok(TaskRequest::KillProcess(KillRequest { process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, signal: from.signal, all: from.all, @@ -103,47 +103,47 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::WaitRequest) -> Result { - Ok(Request::WaitProcess( + Ok(TaskRequest::WaitProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::StartRequest) -> Result { - Ok(Request::StartProcess( + Ok(TaskRequest::StartProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::StateRequest) -> Result { - Ok(Request::StateProcess( + Ok(TaskRequest::StateProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ShutdownRequest) -> Result { - Ok(Request::ShutdownContainer(ShutdownRequest { + Ok(TaskRequest::ShutdownContainer(ShutdownRequest { container_id: from.id.to_string(), is_now: from.now, })) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ResizePtyRequest) -> Result { - Ok(Request::ResizeProcessPTY(ResizePTYRequest { + Ok(TaskRequest::ResizeProcessPTY(ResizePTYRequest { process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, width: from.width, height: from.height, @@ -151,47 +151,47 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::PauseRequest) -> Result { - Ok(Request::PauseContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::PauseContainer(ContainerID::new(&from.id)?)) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ResumeRequest) -> Result { - Ok(Request::ResumeContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::ResumeContainer(ContainerID::new(&from.id)?)) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::StatsRequest) -> Result { - Ok(Request::StatsContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::StatsContainer(ContainerID::new(&from.id)?)) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::UpdateTaskRequest) -> Result { - Ok(Request::UpdateContainer(UpdateRequest { + Ok(TaskRequest::UpdateContainer(UpdateRequest { container_id: from.id.to_string(), value: from.resources().value.to_vec(), })) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(_from: api::PidsRequest) -> Result { - Ok(Request::Pid) + Ok(TaskRequest::Pid) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ConnectRequest) -> Result { - Ok(Request::ConnectContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::ConnectContainer(ContainerID::new(&from.id)?)) } } diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs index 5f758f5676..d738665199 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -13,7 +13,7 @@ use std::{ use anyhow::{anyhow, Result}; use containerd_shim_protos::api; -use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response}; +use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse}; use crate::error::Error; fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp { @@ -89,11 +89,11 @@ impl From for api::DeleteResponse { } } -impl TryFrom for api::CreateTaskResponse { +impl TryFrom for api::CreateTaskResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::CreateContainer(resp) => Ok(Self { + TaskResponse::CreateContainer(resp) => Ok(Self { pid: resp.pid, ..Default::default() }), @@ -105,11 +105,11 @@ impl TryFrom for api::CreateTaskResponse { } } -impl TryFrom for api::DeleteResponse { +impl TryFrom for api::DeleteResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::DeleteProcess(resp) => Ok(resp.into()), + TaskResponse::DeleteProcess(resp) => Ok(resp.into()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() @@ -118,11 +118,11 @@ impl TryFrom for api::DeleteResponse { } } -impl TryFrom for api::WaitResponse { +impl TryFrom for api::WaitResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::WaitProcess(resp) => Ok(resp.into()), + TaskResponse::WaitProcess(resp) => Ok(resp.into()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() @@ -131,11 +131,11 @@ impl TryFrom for api::WaitResponse { } } -impl TryFrom for api::StartResponse { +impl TryFrom for api::StartResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::StartProcess(resp) => Ok(api::StartResponse { + TaskResponse::StartProcess(resp) => Ok(api::StartResponse { pid: resp.pid, ..Default::default() }), @@ -147,11 +147,11 @@ impl TryFrom for api::StartResponse { } } -impl TryFrom for api::StateResponse { +impl TryFrom for api::StateResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::StateProcess(resp) => Ok(resp.into()), + TaskResponse::StateProcess(resp) => Ok(resp.into()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() @@ -160,13 +160,13 @@ impl TryFrom for api::StateResponse { } } -impl TryFrom for api::StatsResponse { +impl TryFrom for api::StatsResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { let mut any = ::protobuf::well_known_types::any::Any::new(); let mut response = api::StatsResponse::new(); match from { - Response::StatsContainer(resp) => { + TaskResponse::StatsContainer(resp) => { if let Some(value) = resp.value { any.type_url = value.type_url; any.value = value.value; @@ -182,11 +182,11 @@ impl TryFrom for api::StatsResponse { } } -impl TryFrom for api::PidsResponse { +impl TryFrom for api::PidsResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::Pid(resp) => { + TaskResponse::Pid(resp) => { let mut processes: Vec = vec![]; let mut p_info = api::ProcessInfo::new(); let mut res = api::PidsResponse::new(); @@ -203,11 +203,11 @@ impl TryFrom for api::PidsResponse { } } -impl TryFrom for api::ConnectResponse { +impl TryFrom for api::ConnectResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::ConnectContainer(resp) => { + TaskResponse::ConnectContainer(resp) => { let mut res = api::ConnectResponse::new(); res.set_shim_pid(resp.pid); Ok(res) @@ -220,18 +220,18 @@ impl TryFrom for api::ConnectResponse { } } -impl TryFrom for api::Empty { +impl TryFrom for api::Empty { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::CloseProcessIO => Ok(api::Empty::new()), - Response::ExecProcess => Ok(api::Empty::new()), - Response::KillProcess => Ok(api::Empty::new()), - Response::ShutdownContainer => Ok(api::Empty::new()), - Response::PauseContainer => Ok(api::Empty::new()), - Response::ResumeContainer => Ok(api::Empty::new()), - Response::ResizeProcessPTY => Ok(api::Empty::new()), - Response::UpdateContainer => Ok(api::Empty::new()), + TaskResponse::CloseProcessIO => Ok(api::Empty::new()), + TaskResponse::ExecProcess => Ok(api::Empty::new()), + TaskResponse::KillProcess => Ok(api::Empty::new()), + TaskResponse::ShutdownContainer => Ok(api::Empty::new()), + TaskResponse::PauseContainer => Ok(api::Empty::new()), + TaskResponse::ResumeContainer => Ok(api::Empty::new()), + TaskResponse::ResizeProcessPTY => Ok(api::Empty::new()), + TaskResponse::UpdateContainer => Ok(api::Empty::new()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index 684157617e..ba61a7c3ff 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Context, Result}; use common::{ message::Message, - types::{Request, Response}, + types::{TaskRequest, TaskResponse}, RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv, }; use hypervisor::Param; @@ -361,8 +361,8 @@ impl RuntimeHandlerManager { } #[instrument(parent = &*(ROOTSPAN))] - pub async fn handler_message(&self, req: Request) -> Result { - if let Request::CreateContainer(container_config) = req { + pub async fn handler_message(&self, req: TaskRequest) -> Result { + if let TaskRequest::CreateContainer(container_config) = req { // get oci spec let bundler_path = format!( "{}/{}", @@ -393,14 +393,16 @@ impl RuntimeHandlerManager { .await .context("create container")?; - Ok(Response::CreateContainer(shim_pid)) + Ok(TaskResponse::CreateContainer(shim_pid)) } else { - self.handler_request(req).await.context("handler request") + self.handler_request(req) + .await + .context("handler TaskRequest") } } #[instrument(parent = &(*ROOTSPAN))] - pub async fn handler_request(&self, req: Request) -> Result { + pub async fn handler_request(&self, req: TaskRequest) -> Result { let instance = self .get_runtime_instance() .await @@ -409,24 +411,24 @@ impl RuntimeHandlerManager { let cm = instance.container_manager.clone(); match req { - Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)), - Request::CloseProcessIO(process_id) => { + TaskRequest::CreateContainer(req) => Err(anyhow!("Unreachable TaskRequest {:?}", req)), + TaskRequest::CloseProcessIO(process_id) => { cm.close_process_io(&process_id).await.context("close io")?; - Ok(Response::CloseProcessIO) + Ok(TaskResponse::CloseProcessIO) } - Request::DeleteProcess(process_id) => { + TaskRequest::DeleteProcess(process_id) => { let resp = cm.delete_process(&process_id).await.context("do delete")?; - Ok(Response::DeleteProcess(resp)) + Ok(TaskResponse::DeleteProcess(resp)) } - Request::ExecProcess(req) => { + TaskRequest::ExecProcess(req) => { cm.exec_process(req).await.context("exec")?; - Ok(Response::ExecProcess) + Ok(TaskResponse::ExecProcess) } - Request::KillProcess(req) => { + TaskRequest::KillProcess(req) => { cm.kill_process(&req).await.context("kill process")?; - Ok(Response::KillProcess) + Ok(TaskResponse::KillProcess) } - Request::ShutdownContainer(req) => { + TaskRequest::ShutdownContainer(req) => { if cm.need_shutdown_sandbox(&req).await { sandbox.shutdown().await.context("do shutdown")?; @@ -435,59 +437,59 @@ impl RuntimeHandlerManager { let tracer = kata_tracer.lock().await; tracer.trace_end(); } - Ok(Response::ShutdownContainer) + Ok(TaskResponse::ShutdownContainer) } - Request::WaitProcess(process_id) => { + TaskRequest::WaitProcess(process_id) => { let exit_status = cm.wait_process(&process_id).await.context("wait process")?; if cm.is_sandbox_container(&process_id).await { sandbox.stop().await.context("stop sandbox")?; } - Ok(Response::WaitProcess(exit_status)) + Ok(TaskResponse::WaitProcess(exit_status)) } - Request::StartProcess(process_id) => { + TaskRequest::StartProcess(process_id) => { let shim_pid = cm .start_process(&process_id) .await .context("start process")?; - Ok(Response::StartProcess(shim_pid)) + Ok(TaskResponse::StartProcess(shim_pid)) } - Request::StateProcess(process_id) => { + TaskRequest::StateProcess(process_id) => { let state = cm .state_process(&process_id) .await .context("state process")?; - Ok(Response::StateProcess(state)) + Ok(TaskResponse::StateProcess(state)) } - Request::PauseContainer(container_id) => { + TaskRequest::PauseContainer(container_id) => { cm.pause_container(&container_id) .await .context("pause container")?; - Ok(Response::PauseContainer) + Ok(TaskResponse::PauseContainer) } - Request::ResumeContainer(container_id) => { + TaskRequest::ResumeContainer(container_id) => { cm.resume_container(&container_id) .await .context("resume container")?; - Ok(Response::ResumeContainer) + Ok(TaskResponse::ResumeContainer) } - Request::ResizeProcessPTY(req) => { + TaskRequest::ResizeProcessPTY(req) => { cm.resize_process_pty(&req).await.context("resize pty")?; - Ok(Response::ResizeProcessPTY) + Ok(TaskResponse::ResizeProcessPTY) } - Request::StatsContainer(container_id) => { + TaskRequest::StatsContainer(container_id) => { let stats = cm .stats_container(&container_id) .await .context("stats container")?; - Ok(Response::StatsContainer(stats)) + Ok(TaskResponse::StatsContainer(stats)) } - Request::UpdateContainer(req) => { + TaskRequest::UpdateContainer(req) => { cm.update_container(req).await.context("update container")?; - Ok(Response::UpdateContainer) + Ok(TaskResponse::UpdateContainer) } - Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)), - Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer( + TaskRequest::Pid => Ok(TaskResponse::Pid(cm.pid().await.context("pid")?)), + TaskRequest::ConnectContainer(container_id) => Ok(TaskResponse::ConnectContainer( cm.connect_container(&container_id) .await .context("connect")?, diff --git a/src/runtime-rs/crates/service/src/manager.rs b/src/runtime-rs/crates/service/src/manager.rs index 5a28b17c54..8542a0c9e9 100644 --- a/src/runtime-rs/crates/service/src/manager.rs +++ b/src/runtime-rs/crates/service/src/manager.rs @@ -25,7 +25,7 @@ const MESSAGE_BUFFER_SIZE: usize = 8; pub struct ServiceManager { receiver: Option>, handler: Arc, - task_server: Option, + server: Option, binary: String, address: String, namespace: String, @@ -37,7 +37,7 @@ impl std::fmt::Debug for ServiceManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ServiceManager") .field("receiver", &self.receiver) - .field("task_server.is_some()", &self.task_server.is_some()) + .field("server.is_some()", &self.server.is_some()) .field("binary", &self.binary) .field("address", &self.address) .field("namespace", &self.namespace) @@ -60,8 +60,8 @@ impl ServiceManager { let (sender, receiver) = channel::(MESSAGE_BUFFER_SIZE); let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?; let handler = Arc::new(rt_mgr); - let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) }; - task_server = task_server.set_domain_unix(); + let mut server = unsafe { Server::from_raw_fd(task_server_fd) }; + server = server.set_domain_unix(); let event_publisher = new_event_publisher(namespace) .await .context("new event publisher")?; @@ -69,7 +69,7 @@ impl ServiceManager { Ok(Self { receiver: Some(receiver), handler, - task_server: Some(task_server), + server: Some(server), binary: containerd_binary.to_string(), address: address.to_string(), namespace: namespace.to_string(), @@ -136,24 +136,24 @@ impl ServiceManager { } fn registry_service(&mut self) -> Result<()> { - if let Some(t) = self.task_server.take() { + if let Some(t) = self.server.take() { let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone())) as Box); let t = t.register_service(shim_async::create_task(task_service)); - self.task_server = Some(t); + self.server = Some(t); } Ok(()) } async fn start_service(&mut self) -> Result<()> { - if let Some(t) = self.task_server.as_mut() { + if let Some(t) = self.server.as_mut() { t.start().await.context("task server start")?; } Ok(()) } async fn stop_service(&mut self) -> Result<()> { - if let Some(t) = self.task_server.as_mut() { + if let Some(t) = self.server.as_mut() { t.stop_listen().await; } Ok(()) diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index 630d6493c8..9e47595b01 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -10,7 +10,7 @@ use std::{ }; use async_trait::async_trait; -use common::types::{Request, Response}; +use common::types::{TaskRequest, TaskResponse}; use containerd_shim_protos::{api, shim_async}; use ttrpc::{self, r#async::TtrpcContext}; @@ -31,10 +31,10 @@ impl TaskService { req: TtrpcReq, ) -> ttrpc::Result where - Request: TryFrom, - >::Error: std::fmt::Debug, - TtrpcResp: TryFrom, - >::Error: std::fmt::Debug, + TaskRequest: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, { let r = req.try_into().map_err(|err| { ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) diff --git a/src/runtime-rs/crates/shim-ctl/src/main.rs b/src/runtime-rs/crates/shim-ctl/src/main.rs index 76506fec2d..28f1b8a7e5 100644 --- a/src/runtime-rs/crates/shim-ctl/src/main.rs +++ b/src/runtime-rs/crates/shim-ctl/src/main.rs @@ -6,7 +6,7 @@ use anyhow::{Context, Result}; use common::{ message::Message, - types::{ContainerConfig, Request}, + types::{ContainerConfig, TaskRequest}, }; use runtimes::RuntimeHandlerManager; use tokio::sync::mpsc::channel; @@ -18,7 +18,7 @@ async fn real_main() { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); let manager = RuntimeHandlerManager::new("xxx", sender).unwrap(); - let req = Request::CreateContainer(ContainerConfig { + let req = TaskRequest::CreateContainer(ContainerConfig { container_id: "xxx".to_owned(), bundle: ".".to_owned(), rootfs_mounts: Vec::new(),