mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-30 14:25:43 +00:00
Merge pull request #10330 from lifupan/main_sandboxapi
Some prepared work for sandbox api support
This commit is contained in:
commit
857222af02
@ -16,7 +16,6 @@ use persist::sandbox_persist::Persist;
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::net::UnixStream;
|
||||
use tokio::sync::watch::{channel, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::{process::Child, sync::mpsc};
|
||||
|
||||
@ -79,13 +78,12 @@ pub struct CloudHypervisorInner {
|
||||
pub(crate) _guest_memory_block_size_mb: u32,
|
||||
|
||||
pub(crate) exit_notify: Option<mpsc::Sender<i32>>,
|
||||
pub(crate) exit_waiter: Mutex<(mpsc::Receiver<i32>, i32)>,
|
||||
}
|
||||
|
||||
const CH_DEFAULT_TIMEOUT_SECS: u32 = 10;
|
||||
|
||||
impl CloudHypervisorInner {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(exit_notify: Option<mpsc::Sender<i32>>) -> Self {
|
||||
let mut capabilities = Capabilities::new();
|
||||
capabilities.set(
|
||||
CapabilityBits::BlockDeviceSupport
|
||||
@ -95,7 +93,6 @@ impl CloudHypervisorInner {
|
||||
);
|
||||
|
||||
let (tx, rx) = channel(true);
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Self {
|
||||
api_socket: None,
|
||||
@ -122,8 +119,7 @@ impl CloudHypervisorInner {
|
||||
ch_features: None,
|
||||
_guest_memory_block_size_mb: 0,
|
||||
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
exit_notify,
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,14 +134,14 @@ impl CloudHypervisorInner {
|
||||
|
||||
impl Default for CloudHypervisorInner {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
Self::new(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Persist for CloudHypervisorInner {
|
||||
type State = HypervisorState;
|
||||
type ConstructorArgs = ();
|
||||
type ConstructorArgs = mpsc::Sender<i32>;
|
||||
|
||||
// Return a state object that will be saved by the caller.
|
||||
async fn save(&self) -> Result<Self::State> {
|
||||
@ -166,11 +162,10 @@ impl Persist for CloudHypervisorInner {
|
||||
|
||||
// Set the hypervisor state to the specified state
|
||||
async fn restore(
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
exit_notify: mpsc::Sender<i32>,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let (tx, rx) = channel(true);
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
let mut ch = Self {
|
||||
config: Some(hypervisor_state.config),
|
||||
@ -190,7 +185,6 @@ impl Persist for CloudHypervisorInner {
|
||||
jailer_root: String::default(),
|
||||
ch_features: None,
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
|
||||
..Default::default()
|
||||
};
|
||||
@ -207,7 +201,9 @@ mod tests {
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_save_clh() {
|
||||
let mut clh = CloudHypervisorInner::new();
|
||||
let (exit_notify, _exit_waiter) = mpsc::channel(1);
|
||||
|
||||
let mut clh = CloudHypervisorInner::new(Some(exit_notify.clone()));
|
||||
clh.id = String::from("123456");
|
||||
clh.netns = Some(String::from("/var/run/netns/testnet"));
|
||||
clh.vm_path = String::from("/opt/kata/bin/cloud-hypervisor");
|
||||
@ -229,7 +225,7 @@ mod tests {
|
||||
assert!(!state.jailed);
|
||||
assert_eq!(state.hypervisor_type, HYPERVISOR_NAME_CH.to_string());
|
||||
|
||||
let clh = CloudHypervisorInner::restore((), state.clone())
|
||||
let clh = CloudHypervisorInner::restore(exit_notify, state.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(clh.id, state.id);
|
||||
|
@ -656,14 +656,9 @@ impl CloudHypervisorInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn wait_vm(&self) -> Result<i32> {
|
||||
debug!(sl!(), "Waiting CH vmm");
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
if let Some(exitcode) = waiter.0.recv().await {
|
||||
waiter.1 = exitcode;
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
pub(crate) fn pause_vm(&self) -> Result<()> {
|
||||
|
@ -12,7 +12,7 @@ use kata_types::capabilities::{Capabilities, CapabilityBits};
|
||||
use kata_types::config::hypervisor::Hypervisor as HypervisorConfig;
|
||||
use persist::sandbox_persist::Persist;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{mpsc, Mutex, RwLock};
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
#[macro_export]
|
||||
@ -29,15 +29,19 @@ mod utils;
|
||||
|
||||
use inner::CloudHypervisorInner;
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct CloudHypervisor {
|
||||
inner: Arc<RwLock<CloudHypervisorInner>>,
|
||||
exit_waiter: Mutex<(mpsc::Receiver<i32>, i32)>,
|
||||
}
|
||||
|
||||
impl CloudHypervisor {
|
||||
pub fn new() -> Self {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(CloudHypervisorInner::new())),
|
||||
inner: Arc::new(RwLock::new(CloudHypervisorInner::new(Some(exit_notify)))),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,6 +51,12 @@ impl CloudHypervisor {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CloudHypervisor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Hypervisor for CloudHypervisor {
|
||||
async fn prepare_vm(&self, id: &str, netns: Option<String>) -> Result<()> {
|
||||
@ -65,8 +75,13 @@ impl Hypervisor for CloudHypervisor {
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.wait_vm().await
|
||||
debug!(sl!(), "Waiting CH vmm");
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
if let Some(exitcode) = waiter.0.recv().await {
|
||||
waiter.1 = exitcode;
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
@ -204,12 +219,15 @@ impl Persist for CloudHypervisor {
|
||||
}
|
||||
|
||||
async fn restore(
|
||||
hypervisor_args: Self::ConstructorArgs,
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let inner = CloudHypervisorInner::restore(hypervisor_args, hypervisor_state).await?;
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
let inner = CloudHypervisorInner::restore(exit_notify, hypervisor_state).await?;
|
||||
Ok(Self {
|
||||
inner: Arc::new(RwLock::new(inner)),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -46,14 +46,12 @@ pub struct FcInner {
|
||||
pub(crate) capabilities: Capabilities,
|
||||
pub(crate) fc_process: Mutex<Option<Child>>,
|
||||
pub(crate) exit_notify: Option<mpsc::Sender<()>>,
|
||||
pub(crate) exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
|
||||
}
|
||||
|
||||
impl FcInner {
|
||||
pub fn new() -> FcInner {
|
||||
pub fn new(exit_notify: mpsc::Sender<()>) -> FcInner {
|
||||
let mut capabilities = Capabilities::new();
|
||||
capabilities.set(CapabilityBits::BlockDeviceSupport);
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
FcInner {
|
||||
id: String::default(),
|
||||
@ -71,7 +69,6 @@ impl FcInner {
|
||||
capabilities,
|
||||
fc_process: Mutex::new(None),
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -124,11 +121,10 @@ impl FcInner {
|
||||
let mut child = cmd.stderr(Stdio::piped()).spawn()?;
|
||||
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
let exit_notify: mpsc::Sender<()> = self
|
||||
let exit_notify = self
|
||||
.exit_notify
|
||||
.take()
|
||||
.ok_or_else(|| anyhow!("no exit notify"))?;
|
||||
|
||||
tokio::spawn(log_fc_stderr(stderr, exit_notify));
|
||||
|
||||
match child.id() {
|
||||
@ -216,7 +212,7 @@ async fn log_fc_stderr(stderr: ChildStderr, exit_notify: mpsc::Sender<()>) -> Re
|
||||
#[async_trait]
|
||||
impl Persist for FcInner {
|
||||
type State = HypervisorState;
|
||||
type ConstructorArgs = ();
|
||||
type ConstructorArgs = mpsc::Sender<()>;
|
||||
|
||||
async fn save(&self) -> Result<Self::State> {
|
||||
Ok(HypervisorState {
|
||||
@ -231,12 +227,7 @@ impl Persist for FcInner {
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
async fn restore(
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result<Self> {
|
||||
Ok(FcInner {
|
||||
id: hypervisor_state.id,
|
||||
asock_path: String::default(),
|
||||
@ -253,7 +244,6 @@ impl Persist for FcInner {
|
||||
capabilities: Capabilities::new(),
|
||||
fc_process: Mutex::new(None),
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -102,21 +102,14 @@ impl FcInner {
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_vm(&self) -> Result<i32> {
|
||||
debug!(sl(), "Wait fc sandbox");
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
|
||||
//wait until the fc process exited.
|
||||
waiter.0.recv().await;
|
||||
|
||||
let mut fc_process = self.fc_process.lock().await;
|
||||
|
||||
if let Some(mut fc_process) = fc_process.take() {
|
||||
if let Ok(status) = fc_process.wait().await {
|
||||
waiter.1 = status.code().unwrap_or(0);
|
||||
}
|
||||
let status = fc_process.wait().await?;
|
||||
Ok(status.code().unwrap_or(0))
|
||||
} else {
|
||||
Err(anyhow!("the process has been reaped"))
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
pub(crate) fn pause_vm(&self) -> Result<()> {
|
||||
|
@ -19,11 +19,14 @@ use kata_types::capabilities::Capabilities;
|
||||
use kata_types::capabilities::CapabilityBits;
|
||||
use persist::sandbox_persist::Persist;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Firecracker {
|
||||
inner: Arc<RwLock<FcInner>>,
|
||||
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
|
||||
}
|
||||
|
||||
// Convenience function to set the scope.
|
||||
@ -39,8 +42,11 @@ impl Default for Firecracker {
|
||||
|
||||
impl Firecracker {
|
||||
pub fn new() -> Self {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(FcInner::new())),
|
||||
inner: Arc::new(RwLock::new(FcInner::new(exit_notify))),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,8 +74,18 @@ impl Hypervisor for Firecracker {
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
debug!(sl(), "Wait fc sandbox");
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
|
||||
//wait until the fc process exited.
|
||||
waiter.0.recv().await;
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
inner.wait_vm().await
|
||||
if let Ok(exit_code) = inner.wait_vm().await {
|
||||
waiter.1 = exit_code;
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
@ -209,12 +225,15 @@ impl Persist for Firecracker {
|
||||
}
|
||||
/// Restore a component from a specified state.
|
||||
async fn restore(
|
||||
hypervisor_args: Self::ConstructorArgs,
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let inner = FcInner::restore(hypervisor_args, hypervisor_state).await?;
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
let inner = FcInner::restore(exit_notify, hypervisor_state).await?;
|
||||
|
||||
Ok(Self {
|
||||
inner: Arc::new(RwLock::new(inner)),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -43,13 +43,10 @@ pub struct QemuInner {
|
||||
netns: Option<String>,
|
||||
|
||||
exit_notify: Option<mpsc::Sender<()>>,
|
||||
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
|
||||
}
|
||||
|
||||
impl QemuInner {
|
||||
pub fn new() -> QemuInner {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
pub fn new(exit_notify: mpsc::Sender<()>) -> QemuInner {
|
||||
QemuInner {
|
||||
id: "".to_string(),
|
||||
qemu_process: Mutex::new(None),
|
||||
@ -59,7 +56,6 @@ impl QemuInner {
|
||||
netns: None,
|
||||
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -202,22 +198,14 @@ impl QemuInner {
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_vm(&self) -> Result<i32> {
|
||||
info!(sl!(), "Wait QEMU VM");
|
||||
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
|
||||
//wait until the qemu process exited.
|
||||
waiter.0.recv().await;
|
||||
|
||||
let mut qemu_process = self.qemu_process.lock().await;
|
||||
|
||||
if let Some(mut qemu_process) = qemu_process.take() {
|
||||
if let Ok(status) = qemu_process.wait().await {
|
||||
waiter.1 = status.code().unwrap_or(0);
|
||||
}
|
||||
let status = qemu_process.wait().await?;
|
||||
Ok(status.code().unwrap_or(0))
|
||||
} else {
|
||||
Err(anyhow!("the process has been reaped"))
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
pub(crate) fn pause_vm(&self) -> Result<()> {
|
||||
@ -589,7 +577,7 @@ impl QemuInner {
|
||||
#[async_trait]
|
||||
impl Persist for QemuInner {
|
||||
type State = HypervisorState;
|
||||
type ConstructorArgs = ();
|
||||
type ConstructorArgs = mpsc::Sender<()>;
|
||||
|
||||
/// Save a state of hypervisor
|
||||
async fn save(&self) -> Result<Self::State> {
|
||||
@ -602,12 +590,7 @@ impl Persist for QemuInner {
|
||||
}
|
||||
|
||||
/// Restore hypervisor
|
||||
async fn restore(
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
async fn restore(exit_notify: mpsc::Sender<()>, hypervisor_state: Self::State) -> Result<Self> {
|
||||
Ok(QemuInner {
|
||||
id: hypervisor_state.id,
|
||||
qemu_process: Mutex::new(None),
|
||||
@ -617,7 +600,6 @@ impl Persist for QemuInner {
|
||||
netns: None,
|
||||
|
||||
exit_notify: Some(exit_notify),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -20,10 +20,12 @@ use async_trait::async_trait;
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Qemu {
|
||||
inner: Arc<RwLock<QemuInner>>,
|
||||
exit_waiter: Mutex<(mpsc::Receiver<()>, i32)>,
|
||||
}
|
||||
|
||||
impl Default for Qemu {
|
||||
@ -34,8 +36,11 @@ impl Default for Qemu {
|
||||
|
||||
impl Qemu {
|
||||
pub fn new() -> Self {
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(QemuInner::new())),
|
||||
inner: Arc::new(RwLock::new(QemuInner::new(exit_notify))),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,8 +68,19 @@ impl Hypervisor for Qemu {
|
||||
}
|
||||
|
||||
async fn wait_vm(&self) -> Result<i32> {
|
||||
info!(sl!(), "Wait QEMU VM");
|
||||
|
||||
let mut waiter = self.exit_waiter.lock().await;
|
||||
|
||||
//wait until the qemu process exited.
|
||||
waiter.0.recv().await;
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
inner.wait_vm().await
|
||||
if let Ok(exit_code) = inner.wait_vm().await {
|
||||
waiter.1 = exit_code;
|
||||
}
|
||||
|
||||
Ok(waiter.1)
|
||||
}
|
||||
|
||||
async fn pause_vm(&self) -> Result<()> {
|
||||
@ -204,12 +220,15 @@ impl Persist for Qemu {
|
||||
|
||||
/// Restore a component from a specified state.
|
||||
async fn restore(
|
||||
hypervisor_args: Self::ConstructorArgs,
|
||||
_hypervisor_args: Self::ConstructorArgs,
|
||||
hypervisor_state: Self::State,
|
||||
) -> Result<Self> {
|
||||
let inner = QemuInner::restore(hypervisor_args, hypervisor_state).await?;
|
||||
let (exit_notify, exit_waiter) = mpsc::channel(1);
|
||||
|
||||
let inner = QemuInner::restore(exit_notify, hypervisor_state).await?;
|
||||
Ok(Self {
|
||||
inner: Arc::new(RwLock::new(inner)),
|
||||
exit_waiter: Mutex::new((exit_waiter, 0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use crate::types::{ContainerProcess, Response};
|
||||
use crate::types::{ContainerProcess, TaskResponse};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
@ -13,5 +13,5 @@ pub enum Error {
|
||||
#[error("failed to find process {0}")]
|
||||
ProcessNotFound(ContainerProcess),
|
||||
#[error("unexpected response {0} to shim {1}")]
|
||||
UnexpectedResponse(Response, String),
|
||||
UnexpectedResponse(TaskResponse, String),
|
||||
}
|
||||
|
@ -16,10 +16,10 @@ use kata_sys_util::validate;
|
||||
use kata_types::mount::Mount;
|
||||
use strum::Display;
|
||||
|
||||
/// Request: request from shim
|
||||
/// Request and Response messages need to be paired
|
||||
/// TaskRequest: TaskRequest from shim
|
||||
/// TaskRequest and TaskResponse messages need to be paired
|
||||
#[derive(Debug, Clone, Display)]
|
||||
pub enum Request {
|
||||
pub enum TaskRequest {
|
||||
CreateContainer(ContainerConfig),
|
||||
CloseProcessIO(ContainerProcess),
|
||||
DeleteProcess(ContainerProcess),
|
||||
@ -38,10 +38,10 @@ pub enum Request {
|
||||
ConnectContainer(ContainerID),
|
||||
}
|
||||
|
||||
/// Response: response to shim
|
||||
/// Request and Response messages need to be paired
|
||||
/// TaskResponse: TaskResponse to shim
|
||||
/// TaskRequest and TaskResponse messages need to be paired
|
||||
#[derive(Debug, Clone, Display)]
|
||||
pub enum Response {
|
||||
pub enum TaskResponse {
|
||||
CreateContainer(PID),
|
||||
CloseProcessIO,
|
||||
DeleteProcess(ProcessStateInfo),
|
||||
|
@ -5,8 +5,8 @@
|
||||
//
|
||||
|
||||
use super::{
|
||||
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request,
|
||||
ResizePTYRequest, ShutdownRequest, UpdateRequest,
|
||||
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
|
||||
ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest,
|
||||
};
|
||||
use anyhow::{Context, Result};
|
||||
use containerd_shim_protos::api;
|
||||
@ -37,7 +37,7 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::CreateTaskRequest> for Request {
|
||||
impl TryFrom<api::CreateTaskRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::CreateTaskRequest) -> Result<Self> {
|
||||
let options = if from.has_options() {
|
||||
@ -45,7 +45,7 @@ impl TryFrom<api::CreateTaskRequest> for Request {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(Request::CreateContainer(ContainerConfig {
|
||||
Ok(TaskRequest::CreateContainer(ContainerConfig {
|
||||
container_id: from.id.clone(),
|
||||
bundle: from.bundle.clone(),
|
||||
rootfs_mounts: from.rootfs.iter().map(trans_from_shim_mount).collect(),
|
||||
@ -58,29 +58,29 @@ impl TryFrom<api::CreateTaskRequest> for Request {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::CloseIORequest> for Request {
|
||||
impl TryFrom<api::CloseIORequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::CloseIORequest) -> Result<Self> {
|
||||
Ok(Request::CloseProcessIO(
|
||||
Ok(TaskRequest::CloseProcessIO(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::DeleteRequest> for Request {
|
||||
impl TryFrom<api::DeleteRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::DeleteRequest) -> Result<Self> {
|
||||
Ok(Request::DeleteProcess(
|
||||
Ok(TaskRequest::DeleteProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ExecProcessRequest> for Request {
|
||||
impl TryFrom<api::ExecProcessRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ExecProcessRequest) -> Result<Self> {
|
||||
let spec = from.spec();
|
||||
Ok(Request::ExecProcess(ExecProcessRequest {
|
||||
Ok(TaskRequest::ExecProcess(ExecProcessRequest {
|
||||
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
terminal: from.terminal,
|
||||
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
|
||||
@ -92,10 +92,10 @@ impl TryFrom<api::ExecProcessRequest> for Request {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::KillRequest> for Request {
|
||||
impl TryFrom<api::KillRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::KillRequest) -> Result<Self> {
|
||||
Ok(Request::KillProcess(KillRequest {
|
||||
Ok(TaskRequest::KillProcess(KillRequest {
|
||||
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
signal: from.signal,
|
||||
all: from.all,
|
||||
@ -103,47 +103,47 @@ impl TryFrom<api::KillRequest> for Request {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::WaitRequest> for Request {
|
||||
impl TryFrom<api::WaitRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::WaitRequest) -> Result<Self> {
|
||||
Ok(Request::WaitProcess(
|
||||
Ok(TaskRequest::WaitProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::StartRequest> for Request {
|
||||
impl TryFrom<api::StartRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::StartRequest) -> Result<Self> {
|
||||
Ok(Request::StartProcess(
|
||||
Ok(TaskRequest::StartProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::StateRequest> for Request {
|
||||
impl TryFrom<api::StateRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::StateRequest) -> Result<Self> {
|
||||
Ok(Request::StateProcess(
|
||||
Ok(TaskRequest::StateProcess(
|
||||
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ShutdownRequest> for Request {
|
||||
impl TryFrom<api::ShutdownRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ShutdownRequest) -> Result<Self> {
|
||||
Ok(Request::ShutdownContainer(ShutdownRequest {
|
||||
Ok(TaskRequest::ShutdownContainer(ShutdownRequest {
|
||||
container_id: from.id.to_string(),
|
||||
is_now: from.now,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ResizePtyRequest> for Request {
|
||||
impl TryFrom<api::ResizePtyRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ResizePtyRequest) -> Result<Self> {
|
||||
Ok(Request::ResizeProcessPTY(ResizePTYRequest {
|
||||
Ok(TaskRequest::ResizeProcessPTY(ResizePTYRequest {
|
||||
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
|
||||
width: from.width,
|
||||
height: from.height,
|
||||
@ -151,47 +151,47 @@ impl TryFrom<api::ResizePtyRequest> for Request {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::PauseRequest> for Request {
|
||||
impl TryFrom<api::PauseRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::PauseRequest) -> Result<Self> {
|
||||
Ok(Request::PauseContainer(ContainerID::new(&from.id)?))
|
||||
Ok(TaskRequest::PauseContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ResumeRequest> for Request {
|
||||
impl TryFrom<api::ResumeRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ResumeRequest) -> Result<Self> {
|
||||
Ok(Request::ResumeContainer(ContainerID::new(&from.id)?))
|
||||
Ok(TaskRequest::ResumeContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::StatsRequest> for Request {
|
||||
impl TryFrom<api::StatsRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::StatsRequest) -> Result<Self> {
|
||||
Ok(Request::StatsContainer(ContainerID::new(&from.id)?))
|
||||
Ok(TaskRequest::StatsContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::UpdateTaskRequest> for Request {
|
||||
impl TryFrom<api::UpdateTaskRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::UpdateTaskRequest) -> Result<Self> {
|
||||
Ok(Request::UpdateContainer(UpdateRequest {
|
||||
Ok(TaskRequest::UpdateContainer(UpdateRequest {
|
||||
container_id: from.id.to_string(),
|
||||
value: from.resources().value.to_vec(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::PidsRequest> for Request {
|
||||
impl TryFrom<api::PidsRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(_from: api::PidsRequest) -> Result<Self> {
|
||||
Ok(Request::Pid)
|
||||
Ok(TaskRequest::Pid)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<api::ConnectRequest> for Request {
|
||||
impl TryFrom<api::ConnectRequest> for TaskRequest {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: api::ConnectRequest) -> Result<Self> {
|
||||
Ok(Request::ConnectContainer(ContainerID::new(&from.id)?))
|
||||
Ok(TaskRequest::ConnectContainer(ContainerID::new(&from.id)?))
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ use std::{
|
||||
use anyhow::{anyhow, Result};
|
||||
use containerd_shim_protos::api;
|
||||
|
||||
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response};
|
||||
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse};
|
||||
use crate::error::Error;
|
||||
|
||||
fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp {
|
||||
@ -89,11 +89,11 @@ impl From<ProcessStateInfo> for api::DeleteResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::CreateTaskResponse {
|
||||
impl TryFrom<TaskResponse> for api::CreateTaskResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::CreateContainer(resp) => Ok(Self {
|
||||
TaskResponse::CreateContainer(resp) => Ok(Self {
|
||||
pid: resp.pid,
|
||||
..Default::default()
|
||||
}),
|
||||
@ -105,11 +105,11 @@ impl TryFrom<Response> for api::CreateTaskResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::DeleteResponse {
|
||||
impl TryFrom<TaskResponse> for api::DeleteResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::DeleteProcess(resp) => Ok(resp.into()),
|
||||
TaskResponse::DeleteProcess(resp) => Ok(resp.into()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
@ -118,11 +118,11 @@ impl TryFrom<Response> for api::DeleteResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::WaitResponse {
|
||||
impl TryFrom<TaskResponse> for api::WaitResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::WaitProcess(resp) => Ok(resp.into()),
|
||||
TaskResponse::WaitProcess(resp) => Ok(resp.into()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
@ -131,11 +131,11 @@ impl TryFrom<Response> for api::WaitResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::StartResponse {
|
||||
impl TryFrom<TaskResponse> for api::StartResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::StartProcess(resp) => Ok(api::StartResponse {
|
||||
TaskResponse::StartProcess(resp) => Ok(api::StartResponse {
|
||||
pid: resp.pid,
|
||||
..Default::default()
|
||||
}),
|
||||
@ -147,11 +147,11 @@ impl TryFrom<Response> for api::StartResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::StateResponse {
|
||||
impl TryFrom<TaskResponse> for api::StateResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::StateProcess(resp) => Ok(resp.into()),
|
||||
TaskResponse::StateProcess(resp) => Ok(resp.into()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
@ -160,13 +160,13 @@ impl TryFrom<Response> for api::StateResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::StatsResponse {
|
||||
impl TryFrom<TaskResponse> for api::StatsResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
let mut any = ::protobuf::well_known_types::any::Any::new();
|
||||
let mut response = api::StatsResponse::new();
|
||||
match from {
|
||||
Response::StatsContainer(resp) => {
|
||||
TaskResponse::StatsContainer(resp) => {
|
||||
if let Some(value) = resp.value {
|
||||
any.type_url = value.type_url;
|
||||
any.value = value.value;
|
||||
@ -182,11 +182,11 @@ impl TryFrom<Response> for api::StatsResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::PidsResponse {
|
||||
impl TryFrom<TaskResponse> for api::PidsResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::Pid(resp) => {
|
||||
TaskResponse::Pid(resp) => {
|
||||
let mut processes: Vec<api::ProcessInfo> = vec![];
|
||||
let mut p_info = api::ProcessInfo::new();
|
||||
let mut res = api::PidsResponse::new();
|
||||
@ -203,11 +203,11 @@ impl TryFrom<Response> for api::PidsResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::ConnectResponse {
|
||||
impl TryFrom<TaskResponse> for api::ConnectResponse {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::ConnectContainer(resp) => {
|
||||
TaskResponse::ConnectContainer(resp) => {
|
||||
let mut res = api::ConnectResponse::new();
|
||||
res.set_shim_pid(resp.pid);
|
||||
Ok(res)
|
||||
@ -220,18 +220,18 @@ impl TryFrom<Response> for api::ConnectResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<Response> for api::Empty {
|
||||
impl TryFrom<TaskResponse> for api::Empty {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(from: Response) -> Result<Self> {
|
||||
fn try_from(from: TaskResponse) -> Result<Self> {
|
||||
match from {
|
||||
Response::CloseProcessIO => Ok(api::Empty::new()),
|
||||
Response::ExecProcess => Ok(api::Empty::new()),
|
||||
Response::KillProcess => Ok(api::Empty::new()),
|
||||
Response::ShutdownContainer => Ok(api::Empty::new()),
|
||||
Response::PauseContainer => Ok(api::Empty::new()),
|
||||
Response::ResumeContainer => Ok(api::Empty::new()),
|
||||
Response::ResizeProcessPTY => Ok(api::Empty::new()),
|
||||
Response::UpdateContainer => Ok(api::Empty::new()),
|
||||
TaskResponse::CloseProcessIO => Ok(api::Empty::new()),
|
||||
TaskResponse::ExecProcess => Ok(api::Empty::new()),
|
||||
TaskResponse::KillProcess => Ok(api::Empty::new()),
|
||||
TaskResponse::ShutdownContainer => Ok(api::Empty::new()),
|
||||
TaskResponse::PauseContainer => Ok(api::Empty::new()),
|
||||
TaskResponse::ResumeContainer => Ok(api::Empty::new()),
|
||||
TaskResponse::ResizeProcessPTY => Ok(api::Empty::new()),
|
||||
TaskResponse::UpdateContainer => Ok(api::Empty::new()),
|
||||
_ => Err(anyhow!(Error::UnexpectedResponse(
|
||||
from,
|
||||
type_name::<Self>().to_string()
|
||||
|
@ -7,7 +7,7 @@
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use common::{
|
||||
message::Message,
|
||||
types::{Request, Response},
|
||||
types::{TaskRequest, TaskResponse},
|
||||
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
|
||||
};
|
||||
use hypervisor::Param;
|
||||
@ -361,8 +361,8 @@ impl RuntimeHandlerManager {
|
||||
}
|
||||
|
||||
#[instrument(parent = &*(ROOTSPAN))]
|
||||
pub async fn handler_message(&self, req: Request) -> Result<Response> {
|
||||
if let Request::CreateContainer(container_config) = req {
|
||||
pub async fn handler_message(&self, req: TaskRequest) -> Result<TaskResponse> {
|
||||
if let TaskRequest::CreateContainer(container_config) = req {
|
||||
// get oci spec
|
||||
let bundler_path = format!(
|
||||
"{}/{}",
|
||||
@ -393,14 +393,16 @@ impl RuntimeHandlerManager {
|
||||
.await
|
||||
.context("create container")?;
|
||||
|
||||
Ok(Response::CreateContainer(shim_pid))
|
||||
Ok(TaskResponse::CreateContainer(shim_pid))
|
||||
} else {
|
||||
self.handler_request(req).await.context("handler request")
|
||||
self.handler_request(req)
|
||||
.await
|
||||
.context("handler TaskRequest")
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(parent = &(*ROOTSPAN))]
|
||||
pub async fn handler_request(&self, req: Request) -> Result<Response> {
|
||||
pub async fn handler_request(&self, req: TaskRequest) -> Result<TaskResponse> {
|
||||
let instance = self
|
||||
.get_runtime_instance()
|
||||
.await
|
||||
@ -409,24 +411,24 @@ impl RuntimeHandlerManager {
|
||||
let cm = instance.container_manager.clone();
|
||||
|
||||
match req {
|
||||
Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)),
|
||||
Request::CloseProcessIO(process_id) => {
|
||||
TaskRequest::CreateContainer(req) => Err(anyhow!("Unreachable TaskRequest {:?}", req)),
|
||||
TaskRequest::CloseProcessIO(process_id) => {
|
||||
cm.close_process_io(&process_id).await.context("close io")?;
|
||||
Ok(Response::CloseProcessIO)
|
||||
Ok(TaskResponse::CloseProcessIO)
|
||||
}
|
||||
Request::DeleteProcess(process_id) => {
|
||||
TaskRequest::DeleteProcess(process_id) => {
|
||||
let resp = cm.delete_process(&process_id).await.context("do delete")?;
|
||||
Ok(Response::DeleteProcess(resp))
|
||||
Ok(TaskResponse::DeleteProcess(resp))
|
||||
}
|
||||
Request::ExecProcess(req) => {
|
||||
TaskRequest::ExecProcess(req) => {
|
||||
cm.exec_process(req).await.context("exec")?;
|
||||
Ok(Response::ExecProcess)
|
||||
Ok(TaskResponse::ExecProcess)
|
||||
}
|
||||
Request::KillProcess(req) => {
|
||||
TaskRequest::KillProcess(req) => {
|
||||
cm.kill_process(&req).await.context("kill process")?;
|
||||
Ok(Response::KillProcess)
|
||||
Ok(TaskResponse::KillProcess)
|
||||
}
|
||||
Request::ShutdownContainer(req) => {
|
||||
TaskRequest::ShutdownContainer(req) => {
|
||||
if cm.need_shutdown_sandbox(&req).await {
|
||||
sandbox.shutdown().await.context("do shutdown")?;
|
||||
|
||||
@ -435,59 +437,59 @@ impl RuntimeHandlerManager {
|
||||
let tracer = kata_tracer.lock().await;
|
||||
tracer.trace_end();
|
||||
}
|
||||
Ok(Response::ShutdownContainer)
|
||||
Ok(TaskResponse::ShutdownContainer)
|
||||
}
|
||||
Request::WaitProcess(process_id) => {
|
||||
TaskRequest::WaitProcess(process_id) => {
|
||||
let exit_status = cm.wait_process(&process_id).await.context("wait process")?;
|
||||
if cm.is_sandbox_container(&process_id).await {
|
||||
sandbox.stop().await.context("stop sandbox")?;
|
||||
}
|
||||
Ok(Response::WaitProcess(exit_status))
|
||||
Ok(TaskResponse::WaitProcess(exit_status))
|
||||
}
|
||||
Request::StartProcess(process_id) => {
|
||||
TaskRequest::StartProcess(process_id) => {
|
||||
let shim_pid = cm
|
||||
.start_process(&process_id)
|
||||
.await
|
||||
.context("start process")?;
|
||||
Ok(Response::StartProcess(shim_pid))
|
||||
Ok(TaskResponse::StartProcess(shim_pid))
|
||||
}
|
||||
|
||||
Request::StateProcess(process_id) => {
|
||||
TaskRequest::StateProcess(process_id) => {
|
||||
let state = cm
|
||||
.state_process(&process_id)
|
||||
.await
|
||||
.context("state process")?;
|
||||
Ok(Response::StateProcess(state))
|
||||
Ok(TaskResponse::StateProcess(state))
|
||||
}
|
||||
Request::PauseContainer(container_id) => {
|
||||
TaskRequest::PauseContainer(container_id) => {
|
||||
cm.pause_container(&container_id)
|
||||
.await
|
||||
.context("pause container")?;
|
||||
Ok(Response::PauseContainer)
|
||||
Ok(TaskResponse::PauseContainer)
|
||||
}
|
||||
Request::ResumeContainer(container_id) => {
|
||||
TaskRequest::ResumeContainer(container_id) => {
|
||||
cm.resume_container(&container_id)
|
||||
.await
|
||||
.context("resume container")?;
|
||||
Ok(Response::ResumeContainer)
|
||||
Ok(TaskResponse::ResumeContainer)
|
||||
}
|
||||
Request::ResizeProcessPTY(req) => {
|
||||
TaskRequest::ResizeProcessPTY(req) => {
|
||||
cm.resize_process_pty(&req).await.context("resize pty")?;
|
||||
Ok(Response::ResizeProcessPTY)
|
||||
Ok(TaskResponse::ResizeProcessPTY)
|
||||
}
|
||||
Request::StatsContainer(container_id) => {
|
||||
TaskRequest::StatsContainer(container_id) => {
|
||||
let stats = cm
|
||||
.stats_container(&container_id)
|
||||
.await
|
||||
.context("stats container")?;
|
||||
Ok(Response::StatsContainer(stats))
|
||||
Ok(TaskResponse::StatsContainer(stats))
|
||||
}
|
||||
Request::UpdateContainer(req) => {
|
||||
TaskRequest::UpdateContainer(req) => {
|
||||
cm.update_container(req).await.context("update container")?;
|
||||
Ok(Response::UpdateContainer)
|
||||
Ok(TaskResponse::UpdateContainer)
|
||||
}
|
||||
Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)),
|
||||
Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer(
|
||||
TaskRequest::Pid => Ok(TaskResponse::Pid(cm.pid().await.context("pid")?)),
|
||||
TaskRequest::ConnectContainer(container_id) => Ok(TaskResponse::ConnectContainer(
|
||||
cm.connect_container(&container_id)
|
||||
.await
|
||||
.context("connect")?,
|
||||
|
@ -25,7 +25,7 @@ const MESSAGE_BUFFER_SIZE: usize = 8;
|
||||
pub struct ServiceManager {
|
||||
receiver: Option<Receiver<Message>>,
|
||||
handler: Arc<RuntimeHandlerManager>,
|
||||
task_server: Option<Server>,
|
||||
server: Option<Server>,
|
||||
binary: String,
|
||||
address: String,
|
||||
namespace: String,
|
||||
@ -37,7 +37,7 @@ impl std::fmt::Debug for ServiceManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ServiceManager")
|
||||
.field("receiver", &self.receiver)
|
||||
.field("task_server.is_some()", &self.task_server.is_some())
|
||||
.field("server.is_some()", &self.server.is_some())
|
||||
.field("binary", &self.binary)
|
||||
.field("address", &self.address)
|
||||
.field("namespace", &self.namespace)
|
||||
@ -60,8 +60,8 @@ impl ServiceManager {
|
||||
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||
let rt_mgr = RuntimeHandlerManager::new(id, sender).context("new runtime handler")?;
|
||||
let handler = Arc::new(rt_mgr);
|
||||
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
|
||||
task_server = task_server.set_domain_unix();
|
||||
let mut server = unsafe { Server::from_raw_fd(task_server_fd) };
|
||||
server = server.set_domain_unix();
|
||||
let event_publisher = new_event_publisher(namespace)
|
||||
.await
|
||||
.context("new event publisher")?;
|
||||
@ -69,7 +69,7 @@ impl ServiceManager {
|
||||
Ok(Self {
|
||||
receiver: Some(receiver),
|
||||
handler,
|
||||
task_server: Some(task_server),
|
||||
server: Some(server),
|
||||
binary: containerd_binary.to_string(),
|
||||
address: address.to_string(),
|
||||
namespace: namespace.to_string(),
|
||||
@ -136,24 +136,24 @@ impl ServiceManager {
|
||||
}
|
||||
|
||||
fn registry_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.task_server.take() {
|
||||
if let Some(t) = self.server.take() {
|
||||
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
|
||||
as Box<dyn shim_async::Task + Send + Sync>);
|
||||
let t = t.register_service(shim_async::create_task(task_service));
|
||||
self.task_server = Some(t);
|
||||
self.server = Some(t);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.task_server.as_mut() {
|
||||
if let Some(t) = self.server.as_mut() {
|
||||
t.start().await.context("task server start")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_service(&mut self) -> Result<()> {
|
||||
if let Some(t) = self.task_server.as_mut() {
|
||||
if let Some(t) = self.server.as_mut() {
|
||||
t.stop_listen().await;
|
||||
}
|
||||
Ok(())
|
||||
|
@ -10,7 +10,7 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common::types::{Request, Response};
|
||||
use common::types::{TaskRequest, TaskResponse};
|
||||
use containerd_shim_protos::{api, shim_async};
|
||||
use ttrpc::{self, r#async::TtrpcContext};
|
||||
|
||||
@ -31,10 +31,10 @@ impl TaskService {
|
||||
req: TtrpcReq,
|
||||
) -> ttrpc::Result<TtrpcResp>
|
||||
where
|
||||
Request: TryFrom<TtrpcReq>,
|
||||
<Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
|
||||
TtrpcResp: TryFrom<Response>,
|
||||
<TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug,
|
||||
TaskRequest: TryFrom<TtrpcReq>,
|
||||
<TaskRequest as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
|
||||
TtrpcResp: TryFrom<TaskResponse>,
|
||||
<TtrpcResp as TryFrom<TaskResponse>>::Error: std::fmt::Debug,
|
||||
{
|
||||
let r = req.try_into().map_err(|err| {
|
||||
ttrpc::Error::Others(format!("failed to translate from shim {:?}", err))
|
||||
|
@ -6,7 +6,7 @@
|
||||
use anyhow::{Context, Result};
|
||||
use common::{
|
||||
message::Message,
|
||||
types::{ContainerConfig, Request},
|
||||
types::{ContainerConfig, TaskRequest},
|
||||
};
|
||||
use runtimes::RuntimeHandlerManager;
|
||||
use tokio::sync::mpsc::channel;
|
||||
@ -18,7 +18,7 @@ async fn real_main() {
|
||||
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
|
||||
let manager = RuntimeHandlerManager::new("xxx", sender).unwrap();
|
||||
|
||||
let req = Request::CreateContainer(ContainerConfig {
|
||||
let req = TaskRequest::CreateContainer(ContainerConfig {
|
||||
container_id: "xxx".to_owned(),
|
||||
bundle: ".".to_owned(),
|
||||
rootfs_mounts: Vec::new(),
|
||||
|
Loading…
Reference in New Issue
Block a user