diff --git a/src/runtime-rs/crates/runtimes/common/src/error.rs b/src/runtime-rs/crates/runtimes/common/src/error.rs index 2ec03c4c6c..a64ff64773 100644 --- a/src/runtime-rs/crates/runtimes/common/src/error.rs +++ b/src/runtime-rs/crates/runtimes/common/src/error.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::types::{ContainerProcess, Response}; +use crate::types::{ContainerProcess, TaskResponse}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -13,5 +13,5 @@ pub enum Error { #[error("failed to find process {0}")] ProcessNotFound(ContainerProcess), #[error("unexpected response {0} to shim {1}")] - UnexpectedResponse(Response, String), + UnexpectedResponse(TaskResponse, String), } diff --git a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs index 0e6f80a4f6..4d339d660a 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/mod.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/mod.rs @@ -16,10 +16,10 @@ use kata_sys_util::validate; use kata_types::mount::Mount; use strum::Display; -/// Request: request from shim -/// Request and Response messages need to be paired +/// TaskRequest: TaskRequest from shim +/// TaskRequest and TaskResponse messages need to be paired #[derive(Debug, Clone, Display)] -pub enum Request { +pub enum TaskRequest { CreateContainer(ContainerConfig), CloseProcessIO(ContainerProcess), DeleteProcess(ContainerProcess), @@ -38,10 +38,10 @@ pub enum Request { ConnectContainer(ContainerID), } -/// Response: response to shim -/// Request and Response messages need to be paired +/// TaskResponse: TaskResponse to shim +/// TaskRequest and TaskResponse messages need to be paired #[derive(Debug, Clone, Display)] -pub enum Response { +pub enum TaskResponse { CreateContainer(PID), CloseProcessIO, DeleteProcess(ProcessStateInfo), diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs index 29a4a676ce..a35ea96cf3 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_from_shim.rs @@ -5,8 +5,8 @@ // use super::{ - ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, Request, - ResizePTYRequest, ShutdownRequest, UpdateRequest, + ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, + ResizePTYRequest, ShutdownRequest, TaskRequest, UpdateRequest, }; use anyhow::{Context, Result}; use containerd_shim_protos::api; @@ -37,7 +37,7 @@ fn trans_from_shim_mount(from: &api::Mount) -> Mount { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::CreateTaskRequest) -> Result { let options = if from.has_options() { @@ -45,7 +45,7 @@ impl TryFrom for Request { } else { None }; - Ok(Request::CreateContainer(ContainerConfig { + Ok(TaskRequest::CreateContainer(ContainerConfig { container_id: from.id.clone(), bundle: from.bundle.clone(), rootfs_mounts: from.rootfs.iter().map(trans_from_shim_mount).collect(), @@ -58,29 +58,29 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::CloseIORequest) -> Result { - Ok(Request::CloseProcessIO( + Ok(TaskRequest::CloseProcessIO( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::DeleteRequest) -> Result { - Ok(Request::DeleteProcess( + Ok(TaskRequest::DeleteProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ExecProcessRequest) -> Result { let spec = from.spec(); - Ok(Request::ExecProcess(ExecProcessRequest { + Ok(TaskRequest::ExecProcess(ExecProcessRequest { process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, terminal: from.terminal, stdin: (!from.stdin.is_empty()).then(|| from.stdin.clone()), @@ -92,10 +92,10 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::KillRequest) -> Result { - Ok(Request::KillProcess(KillRequest { + Ok(TaskRequest::KillProcess(KillRequest { process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, signal: from.signal, all: from.all, @@ -103,47 +103,47 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::WaitRequest) -> Result { - Ok(Request::WaitProcess( + Ok(TaskRequest::WaitProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::StartRequest) -> Result { - Ok(Request::StartProcess( + Ok(TaskRequest::StartProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::StateRequest) -> Result { - Ok(Request::StateProcess( + Ok(TaskRequest::StateProcess( ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, )) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ShutdownRequest) -> Result { - Ok(Request::ShutdownContainer(ShutdownRequest { + Ok(TaskRequest::ShutdownContainer(ShutdownRequest { container_id: from.id.to_string(), is_now: from.now, })) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ResizePtyRequest) -> Result { - Ok(Request::ResizeProcessPTY(ResizePTYRequest { + Ok(TaskRequest::ResizeProcessPTY(ResizePTYRequest { process: ContainerProcess::new(&from.id, &from.exec_id).context("new process id")?, width: from.width, height: from.height, @@ -151,47 +151,47 @@ impl TryFrom for Request { } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::PauseRequest) -> Result { - Ok(Request::PauseContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::PauseContainer(ContainerID::new(&from.id)?)) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ResumeRequest) -> Result { - Ok(Request::ResumeContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::ResumeContainer(ContainerID::new(&from.id)?)) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::StatsRequest) -> Result { - Ok(Request::StatsContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::StatsContainer(ContainerID::new(&from.id)?)) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::UpdateTaskRequest) -> Result { - Ok(Request::UpdateContainer(UpdateRequest { + Ok(TaskRequest::UpdateContainer(UpdateRequest { container_id: from.id.to_string(), value: from.resources().value.to_vec(), })) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(_from: api::PidsRequest) -> Result { - Ok(Request::Pid) + Ok(TaskRequest::Pid) } } -impl TryFrom for Request { +impl TryFrom for TaskRequest { type Error = anyhow::Error; fn try_from(from: api::ConnectRequest) -> Result { - Ok(Request::ConnectContainer(ContainerID::new(&from.id)?)) + Ok(TaskRequest::ConnectContainer(ContainerID::new(&from.id)?)) } } diff --git a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs index 5f758f5676..d738665199 100644 --- a/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs +++ b/src/runtime-rs/crates/runtimes/common/src/types/trans_into_shim.rs @@ -13,7 +13,7 @@ use std::{ use anyhow::{anyhow, Result}; use containerd_shim_protos::api; -use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, Response}; +use super::{ProcessExitStatus, ProcessStateInfo, ProcessStatus, TaskResponse}; use crate::error::Error; fn system_time_into(time: time::SystemTime) -> ::protobuf::well_known_types::timestamp::Timestamp { @@ -89,11 +89,11 @@ impl From for api::DeleteResponse { } } -impl TryFrom for api::CreateTaskResponse { +impl TryFrom for api::CreateTaskResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::CreateContainer(resp) => Ok(Self { + TaskResponse::CreateContainer(resp) => Ok(Self { pid: resp.pid, ..Default::default() }), @@ -105,11 +105,11 @@ impl TryFrom for api::CreateTaskResponse { } } -impl TryFrom for api::DeleteResponse { +impl TryFrom for api::DeleteResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::DeleteProcess(resp) => Ok(resp.into()), + TaskResponse::DeleteProcess(resp) => Ok(resp.into()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() @@ -118,11 +118,11 @@ impl TryFrom for api::DeleteResponse { } } -impl TryFrom for api::WaitResponse { +impl TryFrom for api::WaitResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::WaitProcess(resp) => Ok(resp.into()), + TaskResponse::WaitProcess(resp) => Ok(resp.into()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() @@ -131,11 +131,11 @@ impl TryFrom for api::WaitResponse { } } -impl TryFrom for api::StartResponse { +impl TryFrom for api::StartResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::StartProcess(resp) => Ok(api::StartResponse { + TaskResponse::StartProcess(resp) => Ok(api::StartResponse { pid: resp.pid, ..Default::default() }), @@ -147,11 +147,11 @@ impl TryFrom for api::StartResponse { } } -impl TryFrom for api::StateResponse { +impl TryFrom for api::StateResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::StateProcess(resp) => Ok(resp.into()), + TaskResponse::StateProcess(resp) => Ok(resp.into()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() @@ -160,13 +160,13 @@ impl TryFrom for api::StateResponse { } } -impl TryFrom for api::StatsResponse { +impl TryFrom for api::StatsResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { let mut any = ::protobuf::well_known_types::any::Any::new(); let mut response = api::StatsResponse::new(); match from { - Response::StatsContainer(resp) => { + TaskResponse::StatsContainer(resp) => { if let Some(value) = resp.value { any.type_url = value.type_url; any.value = value.value; @@ -182,11 +182,11 @@ impl TryFrom for api::StatsResponse { } } -impl TryFrom for api::PidsResponse { +impl TryFrom for api::PidsResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::Pid(resp) => { + TaskResponse::Pid(resp) => { let mut processes: Vec = vec![]; let mut p_info = api::ProcessInfo::new(); let mut res = api::PidsResponse::new(); @@ -203,11 +203,11 @@ impl TryFrom for api::PidsResponse { } } -impl TryFrom for api::ConnectResponse { +impl TryFrom for api::ConnectResponse { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::ConnectContainer(resp) => { + TaskResponse::ConnectContainer(resp) => { let mut res = api::ConnectResponse::new(); res.set_shim_pid(resp.pid); Ok(res) @@ -220,18 +220,18 @@ impl TryFrom for api::ConnectResponse { } } -impl TryFrom for api::Empty { +impl TryFrom for api::Empty { type Error = anyhow::Error; - fn try_from(from: Response) -> Result { + fn try_from(from: TaskResponse) -> Result { match from { - Response::CloseProcessIO => Ok(api::Empty::new()), - Response::ExecProcess => Ok(api::Empty::new()), - Response::KillProcess => Ok(api::Empty::new()), - Response::ShutdownContainer => Ok(api::Empty::new()), - Response::PauseContainer => Ok(api::Empty::new()), - Response::ResumeContainer => Ok(api::Empty::new()), - Response::ResizeProcessPTY => Ok(api::Empty::new()), - Response::UpdateContainer => Ok(api::Empty::new()), + TaskResponse::CloseProcessIO => Ok(api::Empty::new()), + TaskResponse::ExecProcess => Ok(api::Empty::new()), + TaskResponse::KillProcess => Ok(api::Empty::new()), + TaskResponse::ShutdownContainer => Ok(api::Empty::new()), + TaskResponse::PauseContainer => Ok(api::Empty::new()), + TaskResponse::ResumeContainer => Ok(api::Empty::new()), + TaskResponse::ResizeProcessPTY => Ok(api::Empty::new()), + TaskResponse::UpdateContainer => Ok(api::Empty::new()), _ => Err(anyhow!(Error::UnexpectedResponse( from, type_name::().to_string() diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index 684157617e..ba61a7c3ff 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, Context, Result}; use common::{ message::Message, - types::{Request, Response}, + types::{TaskRequest, TaskResponse}, RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv, }; use hypervisor::Param; @@ -361,8 +361,8 @@ impl RuntimeHandlerManager { } #[instrument(parent = &*(ROOTSPAN))] - pub async fn handler_message(&self, req: Request) -> Result { - if let Request::CreateContainer(container_config) = req { + pub async fn handler_message(&self, req: TaskRequest) -> Result { + if let TaskRequest::CreateContainer(container_config) = req { // get oci spec let bundler_path = format!( "{}/{}", @@ -393,14 +393,16 @@ impl RuntimeHandlerManager { .await .context("create container")?; - Ok(Response::CreateContainer(shim_pid)) + Ok(TaskResponse::CreateContainer(shim_pid)) } else { - self.handler_request(req).await.context("handler request") + self.handler_request(req) + .await + .context("handler TaskRequest") } } #[instrument(parent = &(*ROOTSPAN))] - pub async fn handler_request(&self, req: Request) -> Result { + pub async fn handler_request(&self, req: TaskRequest) -> Result { let instance = self .get_runtime_instance() .await @@ -409,24 +411,24 @@ impl RuntimeHandlerManager { let cm = instance.container_manager.clone(); match req { - Request::CreateContainer(req) => Err(anyhow!("Unreachable request {:?}", req)), - Request::CloseProcessIO(process_id) => { + TaskRequest::CreateContainer(req) => Err(anyhow!("Unreachable TaskRequest {:?}", req)), + TaskRequest::CloseProcessIO(process_id) => { cm.close_process_io(&process_id).await.context("close io")?; - Ok(Response::CloseProcessIO) + Ok(TaskResponse::CloseProcessIO) } - Request::DeleteProcess(process_id) => { + TaskRequest::DeleteProcess(process_id) => { let resp = cm.delete_process(&process_id).await.context("do delete")?; - Ok(Response::DeleteProcess(resp)) + Ok(TaskResponse::DeleteProcess(resp)) } - Request::ExecProcess(req) => { + TaskRequest::ExecProcess(req) => { cm.exec_process(req).await.context("exec")?; - Ok(Response::ExecProcess) + Ok(TaskResponse::ExecProcess) } - Request::KillProcess(req) => { + TaskRequest::KillProcess(req) => { cm.kill_process(&req).await.context("kill process")?; - Ok(Response::KillProcess) + Ok(TaskResponse::KillProcess) } - Request::ShutdownContainer(req) => { + TaskRequest::ShutdownContainer(req) => { if cm.need_shutdown_sandbox(&req).await { sandbox.shutdown().await.context("do shutdown")?; @@ -435,59 +437,59 @@ impl RuntimeHandlerManager { let tracer = kata_tracer.lock().await; tracer.trace_end(); } - Ok(Response::ShutdownContainer) + Ok(TaskResponse::ShutdownContainer) } - Request::WaitProcess(process_id) => { + TaskRequest::WaitProcess(process_id) => { let exit_status = cm.wait_process(&process_id).await.context("wait process")?; if cm.is_sandbox_container(&process_id).await { sandbox.stop().await.context("stop sandbox")?; } - Ok(Response::WaitProcess(exit_status)) + Ok(TaskResponse::WaitProcess(exit_status)) } - Request::StartProcess(process_id) => { + TaskRequest::StartProcess(process_id) => { let shim_pid = cm .start_process(&process_id) .await .context("start process")?; - Ok(Response::StartProcess(shim_pid)) + Ok(TaskResponse::StartProcess(shim_pid)) } - Request::StateProcess(process_id) => { + TaskRequest::StateProcess(process_id) => { let state = cm .state_process(&process_id) .await .context("state process")?; - Ok(Response::StateProcess(state)) + Ok(TaskResponse::StateProcess(state)) } - Request::PauseContainer(container_id) => { + TaskRequest::PauseContainer(container_id) => { cm.pause_container(&container_id) .await .context("pause container")?; - Ok(Response::PauseContainer) + Ok(TaskResponse::PauseContainer) } - Request::ResumeContainer(container_id) => { + TaskRequest::ResumeContainer(container_id) => { cm.resume_container(&container_id) .await .context("resume container")?; - Ok(Response::ResumeContainer) + Ok(TaskResponse::ResumeContainer) } - Request::ResizeProcessPTY(req) => { + TaskRequest::ResizeProcessPTY(req) => { cm.resize_process_pty(&req).await.context("resize pty")?; - Ok(Response::ResizeProcessPTY) + Ok(TaskResponse::ResizeProcessPTY) } - Request::StatsContainer(container_id) => { + TaskRequest::StatsContainer(container_id) => { let stats = cm .stats_container(&container_id) .await .context("stats container")?; - Ok(Response::StatsContainer(stats)) + Ok(TaskResponse::StatsContainer(stats)) } - Request::UpdateContainer(req) => { + TaskRequest::UpdateContainer(req) => { cm.update_container(req).await.context("update container")?; - Ok(Response::UpdateContainer) + Ok(TaskResponse::UpdateContainer) } - Request::Pid => Ok(Response::Pid(cm.pid().await.context("pid")?)), - Request::ConnectContainer(container_id) => Ok(Response::ConnectContainer( + TaskRequest::Pid => Ok(TaskResponse::Pid(cm.pid().await.context("pid")?)), + TaskRequest::ConnectContainer(container_id) => Ok(TaskResponse::ConnectContainer( cm.connect_container(&container_id) .await .context("connect")?, diff --git a/src/runtime-rs/crates/service/src/task_service.rs b/src/runtime-rs/crates/service/src/task_service.rs index 630d6493c8..9e47595b01 100644 --- a/src/runtime-rs/crates/service/src/task_service.rs +++ b/src/runtime-rs/crates/service/src/task_service.rs @@ -10,7 +10,7 @@ use std::{ }; use async_trait::async_trait; -use common::types::{Request, Response}; +use common::types::{TaskRequest, TaskResponse}; use containerd_shim_protos::{api, shim_async}; use ttrpc::{self, r#async::TtrpcContext}; @@ -31,10 +31,10 @@ impl TaskService { req: TtrpcReq, ) -> ttrpc::Result where - Request: TryFrom, - >::Error: std::fmt::Debug, - TtrpcResp: TryFrom, - >::Error: std::fmt::Debug, + TaskRequest: TryFrom, + >::Error: std::fmt::Debug, + TtrpcResp: TryFrom, + >::Error: std::fmt::Debug, { let r = req.try_into().map_err(|err| { ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)) diff --git a/src/runtime-rs/crates/shim-ctl/src/main.rs b/src/runtime-rs/crates/shim-ctl/src/main.rs index 76506fec2d..28f1b8a7e5 100644 --- a/src/runtime-rs/crates/shim-ctl/src/main.rs +++ b/src/runtime-rs/crates/shim-ctl/src/main.rs @@ -6,7 +6,7 @@ use anyhow::{Context, Result}; use common::{ message::Message, - types::{ContainerConfig, Request}, + types::{ContainerConfig, TaskRequest}, }; use runtimes::RuntimeHandlerManager; use tokio::sync::mpsc::channel; @@ -18,7 +18,7 @@ async fn real_main() { let (sender, _receiver) = channel::(MESSAGE_BUFFER_SIZE); let manager = RuntimeHandlerManager::new("xxx", sender).unwrap(); - let req = Request::CreateContainer(ContainerConfig { + let req = TaskRequest::CreateContainer(ContainerConfig { container_id: "xxx".to_owned(), bundle: ".".to_owned(), rootfs_mounts: Vec::new(),