runtime-rs: rename the Request/Response to TaskRequest/TaskResponse

In order to make different from sandbox request/response, this commit
changed the task request/response to TaskRequest/TaskResponse.

Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
Fupan Li 2024-08-31 17:49:40 +08:00
parent ba94eed891
commit 20b4be0225
7 changed files with 121 additions and 119 deletions

View File

@ -4,7 +4,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use crate::types::{ContainerProcess, Response}; use crate::types::{ContainerProcess, TaskResponse};
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum Error { pub enum Error {
@ -13,5 +13,5 @@ pub enum Error {
#[error("failed to find process {0}")] #[error("failed to find process {0}")]
ProcessNotFound(ContainerProcess), ProcessNotFound(ContainerProcess),
#[error("unexpected response {0} to shim {1}")] #[error("unexpected response {0} to shim {1}")]
UnexpectedResponse(Response, String), UnexpectedResponse(TaskResponse, String),
} }

View File

@ -16,10 +16,10 @@ use kata_sys_util::validate;
use kata_types::mount::Mount; use kata_types::mount::Mount;
use strum::Display; use strum::Display;
/// Request: request from shim /// TaskRequest: TaskRequest from shim
/// Request and Response messages need to be paired /// TaskRequest and TaskResponse messages need to be paired
#[derive(Debug, Clone, Display)] #[derive(Debug, Clone, Display)]
pub enum Request { pub enum TaskRequest {
CreateContainer(ContainerConfig), CreateContainer(ContainerConfig),
CloseProcessIO(ContainerProcess), CloseProcessIO(ContainerProcess),
DeleteProcess(ContainerProcess), DeleteProcess(ContainerProcess),
@ -38,10 +38,10 @@ pub enum Request {
ConnectContainer(ContainerID), ConnectContainer(ContainerID),
} }
/// Response: response to shim /// TaskResponse: TaskResponse to shim
/// Request and Response messages need to be paired /// TaskRequest and TaskResponse messages need to be paired
#[derive(Debug, Clone, Display)] #[derive(Debug, Clone, Display)]
pub enum Response { pub enum TaskResponse {
CreateContainer(PID), CreateContainer(PID),
CloseProcessIO, CloseProcessIO,
DeleteProcess(ProcessStateInfo), DeleteProcess(ProcessStateInfo),

View File

@ -5,8 +5,8 @@
// //
use super::{ use super::{
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request, ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
ResizePTYRequest, ShutdownRequest, UpdateRequest, ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest,
}; };
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use containerd_shim_protos::api; use containerd_shim_protos::api;
@ -37,7 +37,7 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount {
} }
} }
impl TryFrom<api::CreateTaskRequest> for Request { impl TryFrom<api::CreateTaskRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::CreateTaskRequest) -> Result<Self> { fn try_from(from: api::CreateTaskRequest) -> Result<Self> {
let options = if from.has_options() { let options = if from.has_options() {
@ -45,7 +45,7 @@ impl TryFrom<api::CreateTaskRequest> for Request {
} else { } else {
None None
}; };
Ok(Request::CreateContainer(ContainerConfig { Ok(TaskRequest::CreateContainer(ContainerConfig {
container_id: from.id.clone(), container_id: from.id.clone(),
bundle: from.bundle.clone(), bundle: from.bundle.clone(),
rootfs_mounts: from.rootfs.iter().map(trans_from_shim_mount).collect(), rootfs_mounts: from.rootfs.iter().map(trans_from_shim_mount).collect(),
@ -58,29 +58,29 @@ impl TryFrom<api::CreateTaskRequest> for Request {
} }
} }
impl TryFrom<api::CloseIORequest> for Request { impl TryFrom<api::CloseIORequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::CloseIORequest) -> Result<Self> { fn try_from(from: api::CloseIORequest) -> Result<Self> {
Ok(Request::CloseProcessIO( Ok(TaskRequest::CloseProcessIO(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
)) ))
} }
} }
impl TryFrom<api::DeleteRequest> for Request { impl TryFrom<api::DeleteRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::DeleteRequest) -> Result<Self> { fn try_from(from: api::DeleteRequest) -> Result<Self> {
Ok(Request::DeleteProcess( Ok(TaskRequest::DeleteProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
)) ))
} }
} }
impl TryFrom<api::ExecProcessRequest> for Request { impl TryFrom<api::ExecProcessRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::ExecProcessRequest) -> Result<Self> { fn try_from(from: api::ExecProcessRequest) -> Result<Self> {
let spec = from.spec(); let spec = from.spec();
Ok(Request::ExecProcess(ExecProcessRequest { Ok(TaskRequest::ExecProcess(ExecProcessRequest {
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
terminal: from.terminal, terminal: from.terminal,
stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()), stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()),
@ -92,10 +92,10 @@ impl TryFrom<api::ExecProcessRequest> for Request {
} }
} }
impl TryFrom<api::KillRequest> for Request { impl TryFrom<api::KillRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::KillRequest) -> Result<Self> { fn try_from(from: api::KillRequest) -> Result<Self> {
Ok(Request::KillProcess(KillRequest { Ok(TaskRequest::KillProcess(KillRequest {
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
signal: from.signal, signal: from.signal,
all: from.all, all: from.all,
@ -103,47 +103,47 @@ impl TryFrom<api::KillRequest> for Request {
} }
} }
impl TryFrom<api::WaitRequest> for Request { impl TryFrom<api::WaitRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::WaitRequest) -> Result<Self> { fn try_from(from: api::WaitRequest) -> Result<Self> {
Ok(Request::WaitProcess( Ok(TaskRequest::WaitProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
)) ))
} }
} }
impl TryFrom<api::StartRequest> for Request { impl TryFrom<api::StartRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::StartRequest) -> Result<Self> { fn try_from(from: api::StartRequest) -> Result<Self> {
Ok(Request::StartProcess( Ok(TaskRequest::StartProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
)) ))
} }
} }
impl TryFrom<api::StateRequest> for Request { impl TryFrom<api::StateRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::StateRequest) -> Result<Self> { fn try_from(from: api::StateRequest) -> Result<Self> {
Ok(Request::StateProcess( Ok(TaskRequest::StateProcess(
ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
)) ))
} }
} }
impl TryFrom<api::ShutdownRequest> for Request { impl TryFrom<api::ShutdownRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::ShutdownRequest) -> Result<Self> { fn try_from(from: api::ShutdownRequest) -> Result<Self> {
Ok(Request::ShutdownContainer(ShutdownRequest { Ok(TaskRequest::ShutdownContainer(ShutdownRequest {
container_id: from.id.to_string(), container_id: from.id.to_string(),
is_now: from.now, is_now: from.now,
})) }))
} }
} }
impl TryFrom<api::ResizePtyRequest> for Request { impl TryFrom<api::ResizePtyRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::ResizePtyRequest) -> Result<Self> { fn try_from(from: api::ResizePtyRequest) -> Result<Self> {
Ok(Request::ResizeProcessPTY(ResizePTYRequest { Ok(TaskRequest::ResizeProcessPTY(ResizePTYRequest {
process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?,
width: from.width, width: from.width,
height: from.height, height: from.height,
@ -151,47 +151,47 @@ impl TryFrom<api::ResizePtyRequest> for Request {
} }
} }
impl TryFrom<api::PauseRequest> for Request { impl TryFrom<api::PauseRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::PauseRequest) -> Result<Self> { fn try_from(from: api::PauseRequest) -> Result<Self> {
Ok(Request::PauseContainer(ContainerID::new(&from.id)?)) Ok(TaskRequest::PauseContainer(ContainerID::new(&from.id)?))
} }
} }
impl TryFrom<api::ResumeRequest> for Request { impl TryFrom<api::ResumeRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::ResumeRequest) -> Result<Self> { fn try_from(from: api::ResumeRequest) -> Result<Self> {
Ok(Request::ResumeContainer(ContainerID::new(&from.id)?)) Ok(TaskRequest::ResumeContainer(ContainerID::new(&from.id)?))
} }
} }
impl TryFrom<api::StatsRequest> for Request { impl TryFrom<api::StatsRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::StatsRequest) -> Result<Self> { fn try_from(from: api::StatsRequest) -> Result<Self> {
Ok(Request::StatsContainer(ContainerID::new(&from.id)?)) Ok(TaskRequest::StatsContainer(ContainerID::new(&from.id)?))
} }
} }
impl TryFrom<api::UpdateTaskRequest> for Request { impl TryFrom<api::UpdateTaskRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::UpdateTaskRequest) -> Result<Self> { fn try_from(from: api::UpdateTaskRequest) -> Result<Self> {
Ok(Request::UpdateContainer(UpdateRequest { Ok(TaskRequest::UpdateContainer(UpdateRequest {
container_id: from.id.to_string(), container_id: from.id.to_string(),
value: from.resources().value.to_vec(), value: from.resources().value.to_vec(),
})) }))
} }
} }
impl TryFrom<api::PidsRequest> for Request { impl TryFrom<api::PidsRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(_from: api::PidsRequest) -> Result<Self> { fn try_from(_from: api::PidsRequest) -> Result<Self> {
Ok(Request::Pid) Ok(TaskRequest::Pid)
} }
} }
impl TryFrom<api::ConnectRequest> for Request { impl TryFrom<api::ConnectRequest> for TaskRequest {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: api::ConnectRequest) -> Result<Self> { fn try_from(from: api::ConnectRequest) -> Result<Self> {
Ok(Request::ConnectContainer(ContainerID::new(&from.id)?)) Ok(TaskRequest::ConnectContainer(ContainerID::new(&from.id)?))
} }
} }

View File

@ -13,7 +13,7 @@ use std::{
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use containerd_shim_protos::api; use containerd_shim_protos::api;
use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response}; use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse};
use crate::error::Error; use crate::error::Error;
fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp { fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp {
@ -89,11 +89,11 @@ impl From<ProcessStateInfo> for api::DeleteResponse {
} }
} }
impl TryFrom<Response> for api::CreateTaskResponse { impl TryFrom<TaskResponse> for api::CreateTaskResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::CreateContainer(resp) => Ok(Self { TaskResponse::CreateContainer(resp) => Ok(Self {
pid: resp.pid, pid: resp.pid,
..Default::default() ..Default::default()
}), }),
@ -105,11 +105,11 @@ impl TryFrom<Response> for api::CreateTaskResponse {
} }
} }
impl TryFrom<Response> for api::DeleteResponse { impl TryFrom<TaskResponse> for api::DeleteResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::DeleteProcess(resp) => Ok(resp.into()), TaskResponse::DeleteProcess(resp) => Ok(resp.into()),
_ => Err(anyhow!(Error::UnexpectedResponse( _ => Err(anyhow!(Error::UnexpectedResponse(
from, from,
type_name::<Self>().to_string() type_name::<Self>().to_string()
@ -118,11 +118,11 @@ impl TryFrom<Response> for api::DeleteResponse {
} }
} }
impl TryFrom<Response> for api::WaitResponse { impl TryFrom<TaskResponse> for api::WaitResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::WaitProcess(resp) => Ok(resp.into()), TaskResponse::WaitProcess(resp) => Ok(resp.into()),
_ => Err(anyhow!(Error::UnexpectedResponse( _ => Err(anyhow!(Error::UnexpectedResponse(
from, from,
type_name::<Self>().to_string() type_name::<Self>().to_string()
@ -131,11 +131,11 @@ impl TryFrom<Response> for api::WaitResponse {
} }
} }
impl TryFrom<Response> for api::StartResponse { impl TryFrom<TaskResponse> for api::StartResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::StartProcess(resp) => Ok(api::StartResponse { TaskResponse::StartProcess(resp) => Ok(api::StartResponse {
pid: resp.pid, pid: resp.pid,
..Default::default() ..Default::default()
}), }),
@ -147,11 +147,11 @@ impl TryFrom<Response> for api::StartResponse {
} }
} }
impl TryFrom<Response> for api::StateResponse { impl TryFrom<TaskResponse> for api::StateResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::StateProcess(resp) => Ok(resp.into()), TaskResponse::StateProcess(resp) => Ok(resp.into()),
_ => Err(anyhow!(Error::UnexpectedResponse( _ => Err(anyhow!(Error::UnexpectedResponse(
from, from,
type_name::<Self>().to_string() type_name::<Self>().to_string()
@ -160,13 +160,13 @@ impl TryFrom<Response> for api::StateResponse {
} }
} }
impl TryFrom<Response> for api::StatsResponse { impl TryFrom<TaskResponse> for api::StatsResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
let mut any = ::protobuf::well_known_types::any::Any::new(); let mut any = ::protobuf::well_known_types::any::Any::new();
let mut response = api::StatsResponse::new(); let mut response = api::StatsResponse::new();
match from { match from {
Response::StatsContainer(resp) => { TaskResponse::StatsContainer(resp) => {
if let Some(value) = resp.value { if let Some(value) = resp.value {
any.type_url = value.type_url; any.type_url = value.type_url;
any.value = value.value; any.value = value.value;
@ -182,11 +182,11 @@ impl TryFrom<Response> for api::StatsResponse {
} }
} }
impl TryFrom<Response> for api::PidsResponse { impl TryFrom<TaskResponse> for api::PidsResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::Pid(resp) => { TaskResponse::Pid(resp) => {
let mut processes: Vec<api::ProcessInfo> = vec![]; let mut processes: Vec<api::ProcessInfo> = vec![];
let mut p_info = api::ProcessInfo::new(); let mut p_info = api::ProcessInfo::new();
let mut res = api::PidsResponse::new(); let mut res = api::PidsResponse::new();
@ -203,11 +203,11 @@ impl TryFrom<Response> for api::PidsResponse {
} }
} }
impl TryFrom<Response> for api::ConnectResponse { impl TryFrom<TaskResponse> for api::ConnectResponse {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::ConnectContainer(resp) => { TaskResponse::ConnectContainer(resp) => {
let mut res = api::ConnectResponse::new(); let mut res = api::ConnectResponse::new();
res.set_shim_pid(resp.pid); res.set_shim_pid(resp.pid);
Ok(res) Ok(res)
@ -220,18 +220,18 @@ impl TryFrom<Response> for api::ConnectResponse {
} }
} }
impl TryFrom<Response> for api::Empty { impl TryFrom<TaskResponse> for api::Empty {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(from: Response) -> Result<Self> { fn try_from(from: TaskResponse) -> Result<Self> {
match from { match from {
Response::CloseProcessIO => Ok(api::Empty::new()), TaskResponse::CloseProcessIO => Ok(api::Empty::new()),
Response::ExecProcess => Ok(api::Empty::new()), TaskResponse::ExecProcess => Ok(api::Empty::new()),
Response::KillProcess => Ok(api::Empty::new()), TaskResponse::KillProcess => Ok(api::Empty::new()),
Response::ShutdownContainer => Ok(api::Empty::new()), TaskResponse::ShutdownContainer => Ok(api::Empty::new()),
Response::PauseContainer => Ok(api::Empty::new()), TaskResponse::PauseContainer => Ok(api::Empty::new()),
Response::ResumeContainer => Ok(api::Empty::new()), TaskResponse::ResumeContainer => Ok(api::Empty::new()),
Response::ResizeProcessPTY => Ok(api::Empty::new()), TaskResponse::ResizeProcessPTY => Ok(api::Empty::new()),
Response::UpdateContainer => Ok(api::Empty::new()), TaskResponse::UpdateContainer => Ok(api::Empty::new()),
_ => Err(anyhow!(Error::UnexpectedResponse( _ => Err(anyhow!(Error::UnexpectedResponse(
from, from,
type_name::<Self>().to_string() type_name::<Self>().to_string()

View File

@ -7,7 +7,7 @@
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use common::{ use common::{
message::Message, message::Message,
types::{Request, Response}, types::{TaskRequest, TaskResponse},
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv, RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
}; };
use hypervisor::Param; use hypervisor::Param;
@ -361,8 +361,8 @@ impl RuntimeHandlerManager {
} }
#[instrument(parent = &*(ROOTSPAN))] #[instrument(parent = &*(ROOTSPAN))]
pub async fn handler_message(&self, req: Request) -> Result<Response> { pub async fn handler_message(&self, req: TaskRequest) -> Result<TaskResponse> {
if let Request::CreateContainer(container_config) = req { if let TaskRequest::CreateContainer(container_config) = req {
// get oci spec // get oci spec
let bundler_path = format!( let bundler_path = format!(
"{}/{}", "{}/{}",
@ -393,14 +393,16 @@ impl RuntimeHandlerManager {
.await .await
.context("create container")?; .context("create container")?;
Ok(Response::CreateContainer(shim_pid)) Ok(TaskResponse::CreateContainer(shim_pid))
} else { } else {
self.handler_request(req).await.context("handler request") self.handler_request(req)
.await
.context("handler TaskRequest")
} }
} }
#[instrument(parent = &(*ROOTSPAN))] #[instrument(parent = &(*ROOTSPAN))]
pub async fn handler_request(&self, req: Request) -> Result<Response> { pub async fn handler_request(&self, req: TaskRequest) -> Result<TaskResponse> {
let instance = self let instance = self
.get_runtime_instance() .get_runtime_instance()
.await .await
@ -409,24 +411,24 @@ impl RuntimeHandlerManager {
let cm = instance.container_manager.clone(); let cm = instance.container_manager.clone();
match req { match req {
Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)), TaskRequest::CreateContainer(req) => Err(anyhow!("Unreachable TaskRequest {:?}", req)),
Request::CloseProcessIO(process_id) => { TaskRequest::CloseProcessIO(process_id) => {
cm.close_process_io(&process_id).await.context("close io")?; cm.close_process_io(&process_id).await.context("close io")?;
Ok(Response::CloseProcessIO) Ok(TaskResponse::CloseProcessIO)
} }
Request::DeleteProcess(process_id) => { TaskRequest::DeleteProcess(process_id) => {
let resp = cm.delete_process(&process_id).await.context("do delete")?; let resp = cm.delete_process(&process_id).await.context("do delete")?;
Ok(Response::DeleteProcess(resp)) Ok(TaskResponse::DeleteProcess(resp))
} }
Request::ExecProcess(req) => { TaskRequest::ExecProcess(req) => {
cm.exec_process(req).await.context("exec")?; cm.exec_process(req).await.context("exec")?;
Ok(Response::ExecProcess) Ok(TaskResponse::ExecProcess)
} }
Request::KillProcess(req) => { TaskRequest::KillProcess(req) => {
cm.kill_process(&req).await.context("kill process")?; cm.kill_process(&req).await.context("kill process")?;
Ok(Response::KillProcess) Ok(TaskResponse::KillProcess)
} }
Request::ShutdownContainer(req) => { TaskRequest::ShutdownContainer(req) => {
if cm.need_shutdown_sandbox(&req).await { if cm.need_shutdown_sandbox(&req).await {
sandbox.shutdown().await.context("do shutdown")?; sandbox.shutdown().await.context("do shutdown")?;
@ -435,59 +437,59 @@ impl RuntimeHandlerManager {
let tracer = kata_tracer.lock().await; let tracer = kata_tracer.lock().await;
tracer.trace_end(); tracer.trace_end();
} }
Ok(Response::ShutdownContainer) Ok(TaskResponse::ShutdownContainer)
} }
Request::WaitProcess(process_id) => { TaskRequest::WaitProcess(process_id) => {
let exit_status = cm.wait_process(&process_id).await.context("wait process")?; let exit_status = cm.wait_process(&process_id).await.context("wait process")?;
if cm.is_sandbox_container(&process_id).await { if cm.is_sandbox_container(&process_id).await {
sandbox.stop().await.context("stop sandbox")?; sandbox.stop().await.context("stop sandbox")?;
} }
Ok(Response::WaitProcess(exit_status)) Ok(TaskResponse::WaitProcess(exit_status))
} }
Request::StartProcess(process_id) => { TaskRequest::StartProcess(process_id) => {
let shim_pid = cm let shim_pid = cm
.start_process(&process_id) .start_process(&process_id)
.await .await
.context("start process")?; .context("start process")?;
Ok(Response::StartProcess(shim_pid)) Ok(TaskResponse::StartProcess(shim_pid))
} }
Request::StateProcess(process_id) => { TaskRequest::StateProcess(process_id) => {
let state = cm let state = cm
.state_process(&process_id) .state_process(&process_id)
.await .await
.context("state process")?; .context("state process")?;
Ok(Response::StateProcess(state)) Ok(TaskResponse::StateProcess(state))
} }
Request::PauseContainer(container_id) => { TaskRequest::PauseContainer(container_id) => {
cm.pause_container(&container_id) cm.pause_container(&container_id)
.await .await
.context("pause container")?; .context("pause container")?;
Ok(Response::PauseContainer) Ok(TaskResponse::PauseContainer)
} }
Request::ResumeContainer(container_id) => { TaskRequest::ResumeContainer(container_id) => {
cm.resume_container(&container_id) cm.resume_container(&container_id)
.await .await
.context("resume container")?; .context("resume container")?;
Ok(Response::ResumeContainer) Ok(TaskResponse::ResumeContainer)
} }
Request::ResizeProcessPTY(req) => { TaskRequest::ResizeProcessPTY(req) => {
cm.resize_process_pty(&req).await.context("resize pty")?; cm.resize_process_pty(&req).await.context("resize pty")?;
Ok(Response::ResizeProcessPTY) Ok(TaskResponse::ResizeProcessPTY)
} }
Request::StatsContainer(container_id) => { TaskRequest::StatsContainer(container_id) => {
let stats = cm let stats = cm
.stats_container(&container_id) .stats_container(&container_id)
.await .await
.context("stats container")?; .context("stats container")?;
Ok(Response::StatsContainer(stats)) Ok(TaskResponse::StatsContainer(stats))
} }
Request::UpdateContainer(req) => { TaskRequest::UpdateContainer(req) => {
cm.update_container(req).await.context("update container")?; cm.update_container(req).await.context("update container")?;
Ok(Response::UpdateContainer) Ok(TaskResponse::UpdateContainer)
} }
Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)), TaskRequest::Pid => Ok(TaskResponse::Pid(cm.pid().await.context("pid")?)),
Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer( TaskRequest::ConnectContainer(container_id) => Ok(TaskResponse::ConnectContainer(
cm.connect_container(&container_id) cm.connect_container(&container_id)
.await .await
.context("connect")?, .context("connect")?,

View File

@ -10,7 +10,7 @@ use std::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use common::types::{Request, Response}; use common::types::{TaskRequest, TaskResponse};
use containerd_shim_protos::{api, shim_async}; use containerd_shim_protos::{api, shim_async};
use ttrpc::{self, r#async::TtrpcContext}; use ttrpc::{self, r#async::TtrpcContext};
@ -31,10 +31,10 @@ impl TaskService {
req: TtrpcReq, req: TtrpcReq,
) -> ttrpc::Result<TtrpcResp> ) -> ttrpc::Result<TtrpcResp>
where where
Request: TryFrom<TtrpcReq>, TaskRequest: TryFrom<TtrpcReq>,
<Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug, <TaskRequest as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
TtrpcResp: TryFrom<Response>, TtrpcResp: TryFrom<TaskResponse>,
<TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug, <TtrpcResp as TryFrom<TaskResponse>>::Error: std::fmt::Debug,
{ {
let r = req.try_into().map_err(|err| { let r = req.try_into().map_err(|err| {
ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) ttrpc::Error::Others(format!("failed to translate from shim {:?}", err))

View File

@ -6,7 +6,7 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use common::{ use common::{
message::Message, message::Message,
types::{ContainerConfig, Request}, types::{ContainerConfig, TaskRequest},
}; };
use runtimes::RuntimeHandlerManager; use runtimes::RuntimeHandlerManager;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
@ -18,7 +18,7 @@ async fn real_main() {
let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE); let (sender, _receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let manager = RuntimeHandlerManager::new("xxx", sender).unwrap(); let manager = RuntimeHandlerManager::new("xxx", sender).unwrap();
let req = Request::CreateContainer(ContainerConfig { let req = TaskRequest::CreateContainer(ContainerConfig {
container_id: "xxx".to_owned(), container_id: "xxx".to_owned(),
bundle: ".".to_owned(), bundle: ".".to_owned(),
rootfs_mounts: Vec::new(), rootfs_mounts: Vec::new(),