mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 04:04:45 +00:00
agent: simplify error handling
We extend the `Result` and `Option` types with associated types that allows converting a `Result<T, E>` and `Option<T>` into `ttrpc::Result<T>`. This allows the elimination of many `match` statements in favor of calling the map function plus the `?` operator. This transformation simplifies the code. Fixes: #7624 Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
parent
e107d1d94e
commit
76dac8f22c
@ -9,6 +9,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
|
|||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use std::ffi::{CString, OsStr};
|
use std::ffi::{CString, OsStr};
|
||||||
|
use std::fmt::Debug;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::os::unix::ffi::OsStrExt;
|
use std::os::unix::ffi::OsStrExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@ -121,7 +122,7 @@ fn sl() -> slog::Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Convenience function to wrap an error and response to ttrpc client
|
// Convenience function to wrap an error and response to ttrpc client
|
||||||
fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error {
|
fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
|
||||||
get_rpc_status(code, format!("{:?}", err))
|
get_rpc_status(code, format!("{:?}", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,6 +166,36 @@ async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result
|
|||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn same<E>(e: E) -> E {
|
||||||
|
e
|
||||||
|
}
|
||||||
|
|
||||||
|
trait ResultToTtrpcResult<T, E: Debug>: Sized {
|
||||||
|
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T>;
|
||||||
|
fn map_ttrpc_err_do(self, doer: impl FnOnce(&E)) -> ttrpc::Result<T> {
|
||||||
|
self.map_ttrpc_err(|e| {
|
||||||
|
doer(&e);
|
||||||
|
e
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E: Debug> ResultToTtrpcResult<T, E> for Result<T, E> {
|
||||||
|
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T> {
|
||||||
|
self.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, msg_builder(e)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait OptionToTtrpcResult<T>: Sized {
|
||||||
|
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> OptionToTtrpcResult<T> for Option<T> {
|
||||||
|
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T> {
|
||||||
|
self.ok_or_else(|| ttrpc_error(code, msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct AgentService {
|
pub struct AgentService {
|
||||||
sandbox: Arc<Mutex<Sandbox>>,
|
sandbox: Arc<Mutex<Sandbox>>,
|
||||||
@ -642,10 +673,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "create_container", req);
|
trace_rpc_call!(ctx, "create_container", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
match self.do_create_container(req).await {
|
self.do_create_container(req).await.map_ttrpc_err(same)?;
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
Ok(Empty::new())
|
||||||
Ok(_) => Ok(Empty::new()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_container(
|
async fn start_container(
|
||||||
@ -655,10 +684,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "start_container", req);
|
trace_rpc_call!(ctx, "start_container", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
match self.do_start_container(req).await {
|
self.do_start_container(req).await.map_ttrpc_err(same)?;
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
Ok(Empty::new())
|
||||||
Ok(_) => Ok(Empty::new()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_container(
|
async fn remove_container(
|
||||||
@ -668,10 +695,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "remove_container", req);
|
trace_rpc_call!(ctx, "remove_container", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
match self.do_remove_container(req).await {
|
self.do_remove_container(req).await.map_ttrpc_err(same)?;
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
Ok(Empty::new())
|
||||||
Ok(_) => Ok(Empty::new()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn exec_process(
|
async fn exec_process(
|
||||||
@ -681,10 +706,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "exec_process", req);
|
trace_rpc_call!(ctx, "exec_process", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
match self.do_exec_process(req).await {
|
self.do_exec_process(req).await.map_ttrpc_err(same)?;
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
Ok(Empty::new())
|
||||||
Ok(_) => Ok(Empty::new()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn signal_process(
|
async fn signal_process(
|
||||||
@ -694,10 +717,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "signal_process", req);
|
trace_rpc_call!(ctx, "signal_process", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
match self.do_signal_process(req).await {
|
self.do_signal_process(req).await.map_ttrpc_err(same)?;
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
Ok(Empty::new())
|
||||||
Ok(_) => Ok(Empty::new()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_process(
|
async fn wait_process(
|
||||||
@ -707,9 +728,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<WaitProcessResponse> {
|
) -> ttrpc::Result<WaitProcessResponse> {
|
||||||
trace_rpc_call!(ctx, "wait_process", req);
|
trace_rpc_call!(ctx, "wait_process", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
self.do_wait_process(req)
|
self.do_wait_process(req).await.map_ttrpc_err(same)
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_container(
|
async fn update_container(
|
||||||
@ -721,18 +740,12 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
|
let ctr = sandbox
|
||||||
ttrpc_error(
|
.get_container(&req.container_id)
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||||
"invalid container id".to_string(),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if let Some(res) = req.resources.as_ref() {
|
if let Some(res) = req.resources.as_ref() {
|
||||||
let oci_res = rustjail::resources_grpc_to_oci(res);
|
let oci_res = rustjail::resources_grpc_to_oci(res);
|
||||||
if let Err(e) = ctr.set(oci_res) {
|
ctr.set(oci_res).map_ttrpc_err(same)?;
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
@ -747,15 +760,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
|
let ctr = sandbox
|
||||||
ttrpc_error(
|
.get_container(&req.container_id)
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||||
"invalid container id".to_string(),
|
ctr.stats().map_ttrpc_err(same)
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
ctr.stats()
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pause_container(
|
async fn pause_container(
|
||||||
@ -767,16 +775,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
|
let ctr = sandbox
|
||||||
ttrpc_error(
|
.get_container(&req.container_id)
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||||
"invalid container id".to_string(),
|
ctr.pause().map_ttrpc_err(same)?;
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
ctr.pause()
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -789,16 +791,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
|
let ctr = sandbox
|
||||||
ttrpc_error(
|
.get_container(&req.container_id)
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||||
"invalid container id".to_string(),
|
ctr.resume().map_ttrpc_err(same)?;
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
ctr.resume()
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -809,8 +805,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
|
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
let mount_infos = parse_mount_table("/proc/self/mountinfo")
|
let mount_infos = parse_mount_table("/proc/self/mountinfo").map_ttrpc_err(same)?;
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
for m in &mount_infos {
|
for m in &mount_infos {
|
||||||
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
|
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
|
||||||
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
|
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
|
||||||
@ -831,9 +826,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
req: protocols::agent::WriteStreamRequest,
|
req: protocols::agent::WriteStreamRequest,
|
||||||
) -> ttrpc::Result<WriteStreamResponse> {
|
) -> ttrpc::Result<WriteStreamResponse> {
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
self.do_write_stream(req)
|
self.do_write_stream(req).await.map_ttrpc_err(same)
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_stdout(
|
async fn read_stdout(
|
||||||
@ -842,9 +835,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
req: protocols::agent::ReadStreamRequest,
|
req: protocols::agent::ReadStreamRequest,
|
||||||
) -> ttrpc::Result<ReadStreamResponse> {
|
) -> ttrpc::Result<ReadStreamResponse> {
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
self.do_read_stream(req, true)
|
self.do_read_stream(req, true).await.map_ttrpc_err(same)
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_stderr(
|
async fn read_stderr(
|
||||||
@ -853,9 +844,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
req: protocols::agent::ReadStreamRequest,
|
req: protocols::agent::ReadStreamRequest,
|
||||||
) -> ttrpc::Result<ReadStreamResponse> {
|
) -> ttrpc::Result<ReadStreamResponse> {
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
self.do_read_stream(req, false)
|
self.do_read_stream(req, false).await.map_ttrpc_err(same)
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn close_stdin(
|
async fn close_stdin(
|
||||||
@ -902,23 +891,20 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if let Some(fd) = p.term_master {
|
let fd = p
|
||||||
unsafe {
|
.term_master
|
||||||
let win = winsize {
|
.map_ttrpc_err(ttrpc::Code::UNAVAILABLE, "no tty")?;
|
||||||
ws_row: req.row as c_ushort,
|
let win = winsize {
|
||||||
ws_col: req.column as c_ushort,
|
ws_row: req.row as c_ushort,
|
||||||
ws_xpixel: 0,
|
ws_col: req.column as c_ushort,
|
||||||
ws_ypixel: 0,
|
ws_xpixel: 0,
|
||||||
};
|
ws_ypixel: 0,
|
||||||
|
};
|
||||||
|
|
||||||
let err = libc::ioctl(fd, TIOCSWINSZ, &win);
|
let err = unsafe { libc::ioctl(fd, TIOCSWINSZ, &win) };
|
||||||
Errno::result(err).map(drop).map_err(|e| {
|
Errno::result(err)
|
||||||
ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e))
|
.map(drop)
|
||||||
})?;
|
.map_ttrpc_err(|e| format!("ioctl error: {:?}", e))?;
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -931,12 +917,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "update_interface", req);
|
trace_rpc_call!(ctx, "update_interface", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
let interface = req.interface.into_option().ok_or_else(|| {
|
let interface = req.interface.into_option().map_ttrpc_err(
|
||||||
ttrpc_error(
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
"empty update interface request",
|
||||||
"empty update interface request".to_string(),
|
)?;
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
self.sandbox
|
self.sandbox
|
||||||
.lock()
|
.lock()
|
||||||
@ -944,9 +928,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.rtnl
|
.rtnl
|
||||||
.update_interface(&interface)
|
.update_interface(&interface)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_ttrpc_err(|e| format!("update interface: {:?}", e))?;
|
||||||
ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(interface)
|
Ok(interface)
|
||||||
}
|
}
|
||||||
@ -959,28 +941,25 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "update_routes", req);
|
trace_rpc_call!(ctx, "update_routes", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| {
|
let new_routes = req
|
||||||
ttrpc_error(
|
.routes
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
.into_option()
|
||||||
"empty update routes request".to_string(),
|
.map(|r| r.Routes)
|
||||||
)
|
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "empty update routes request")?;
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
|
||||||
sandbox.rtnl.update_routes(new_routes).await.map_err(|e| {
|
sandbox
|
||||||
ttrpc_error(
|
.rtnl
|
||||||
ttrpc::Code::INTERNAL,
|
.update_routes(new_routes)
|
||||||
format!("Failed to update routes: {:?}", e),
|
.await
|
||||||
)
|
.map_ttrpc_err(|e| format!("Failed to update routes: {:?}", e))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
let list = sandbox.rtnl.list_routes().await.map_err(|e| {
|
let list = sandbox
|
||||||
ttrpc_error(
|
.rtnl
|
||||||
ttrpc::Code::INTERNAL,
|
.list_routes()
|
||||||
format!("Failed to list routes after update: {:?}", e),
|
.await
|
||||||
)
|
.map_ttrpc_err(|e| format!("Failed to list routes after update: {:?}", e))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(protocols::agent::Routes {
|
Ok(protocols::agent::Routes {
|
||||||
Routes: list,
|
Routes: list,
|
||||||
@ -996,13 +975,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "update_mounts", req);
|
trace_rpc_call!(ctx, "update_mounts", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
match update_ephemeral_mounts(sl(), &req.storages, &self.sandbox).await {
|
update_ephemeral_mounts(sl(), &req.storages, &self.sandbox)
|
||||||
Ok(_) => Ok(Empty::new()),
|
.await
|
||||||
Err(e) => Err(ttrpc_error(
|
.map_ttrpc_err(|e| format!("Failed to update mounts: {:?}", e))?;
|
||||||
ttrpc::Code::INTERNAL,
|
Ok(Empty::new())
|
||||||
format!("Failed to update mounts: {:?}", e),
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_ip_tables(
|
async fn get_ip_tables(
|
||||||
@ -1032,16 +1008,13 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
}
|
}
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
match Command::new(cmd.clone()).output() {
|
let output = Command::new(cmd.clone())
|
||||||
Ok(output) => Ok(GetIPTablesResponse {
|
.output()
|
||||||
data: output.stdout,
|
.map_ttrpc_err_do(|e| warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()))?;
|
||||||
..Default::default()
|
Ok(GetIPTablesResponse {
|
||||||
}),
|
data: output.stdout,
|
||||||
Err(e) => {
|
..Default::default()
|
||||||
warn!(sl(), "failed to run {}: {:?}", cmd, e.kind());
|
})
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn set_ip_tables(
|
async fn set_ip_tables(
|
||||||
@ -1071,20 +1044,14 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
}
|
}
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
let mut child = match Command::new(cmd.clone())
|
let mut child = Command::new(cmd.clone())
|
||||||
.arg("--wait")
|
.arg("--wait")
|
||||||
.arg(IPTABLES_RESTORE_WAIT_SEC.to_string())
|
.arg(IPTABLES_RESTORE_WAIT_SEC.to_string())
|
||||||
.stdin(Stdio::piped())
|
.stdin(Stdio::piped())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
.stderr(Stdio::piped())
|
.stderr(Stdio::piped())
|
||||||
.spawn()
|
.spawn()
|
||||||
{
|
.map_ttrpc_err_do(|e| warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()))?;
|
||||||
Ok(child) => child,
|
|
||||||
Err(e) => {
|
|
||||||
warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind());
|
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut stdin = match child.stdin.take() {
|
let mut stdin = match child.stdin.take() {
|
||||||
Some(si) => si,
|
Some(si) => si,
|
||||||
@ -1092,7 +1059,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
println!("failed to get stdin from child");
|
println!("failed to get stdin from child");
|
||||||
return Err(ttrpc_error(
|
return Err(ttrpc_error(
|
||||||
ttrpc::Code::INTERNAL,
|
ttrpc::Code::INTERNAL,
|
||||||
"failed to take stdin from child".to_string(),
|
"failed to take stdin from child",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1111,35 +1078,22 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
if tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
|
let _ = tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.map_ttrpc_err(|_| "timeout waiting for stdin writer to complete")?;
|
||||||
{
|
|
||||||
return Err(ttrpc_error(
|
|
||||||
ttrpc::Code::INTERNAL,
|
|
||||||
"timeout waiting for stdin writer to complete".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if handle.await.is_err() {
|
handle
|
||||||
return Err(ttrpc_error(
|
.await
|
||||||
ttrpc::Code::INTERNAL,
|
.map_ttrpc_err(|_| "stdin writer thread failure")?;
|
||||||
"stdin writer thread failure".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let output = match child.wait_with_output() {
|
let output = child.wait_with_output().map_ttrpc_err_do(|e| {
|
||||||
Ok(o) => o,
|
warn!(
|
||||||
Err(e) => {
|
sl(),
|
||||||
warn!(
|
"failure waiting for spawned {} to complete: {:?}",
|
||||||
sl(),
|
cmd,
|
||||||
"failure waiting for spawned {} to complete: {:?}",
|
e.kind()
|
||||||
cmd,
|
)
|
||||||
e.kind()
|
})?;
|
||||||
);
|
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
|
warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
|
||||||
@ -1174,12 +1128,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.rtnl
|
.rtnl
|
||||||
.list_interfaces()
|
.list_interfaces()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_ttrpc_err(|e| format!("Failed to list interfaces: {:?}", e))?;
|
||||||
ttrpc_error(
|
|
||||||
ttrpc::Code::INTERNAL,
|
|
||||||
format!("Failed to list interfaces: {:?}", e),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(protocols::agent::Interfaces {
|
Ok(protocols::agent::Interfaces {
|
||||||
Interfaces: list,
|
Interfaces: list,
|
||||||
@ -1202,7 +1151,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.rtnl
|
.rtnl
|
||||||
.list_routes()
|
.list_routes()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
|
.map_ttrpc_err(|e| format!("list routes: {:?}", e))?;
|
||||||
|
|
||||||
Ok(protocols::agent::Routes {
|
Ok(protocols::agent::Routes {
|
||||||
Routes: list,
|
Routes: list,
|
||||||
@ -1241,30 +1190,24 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for m in req.kernel_modules.iter() {
|
for m in req.kernel_modules.iter() {
|
||||||
load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
load_kernel_module(m).map_ttrpc_err(same)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
s.setup_shared_namespaces()
|
s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
match add_storages(sl(), &req.storages, &self.sandbox, None).await {
|
let m = add_storages(sl(), &req.storages, &self.sandbox, None)
|
||||||
Ok(m) => {
|
.await
|
||||||
self.sandbox.lock().await.mounts = m;
|
.map_ttrpc_err(same)?;
|
||||||
}
|
self.sandbox.lock().await.mounts = m;
|
||||||
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match setup_guest_dns(sl(), &req.dns) {
|
setup_guest_dns(sl(), &req.dns).map_ttrpc_err(same)?;
|
||||||
Ok(_) => {
|
{
|
||||||
let mut s = self.sandbox.lock().await;
|
let mut s = self.sandbox.lock().await;
|
||||||
for dns in req.dns {
|
for dns in req.dns {
|
||||||
s.network.set_dns(dns);
|
s.network.set_dns(dns);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
}
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1279,10 +1222,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
// destroy all containers, clean up, notify agent to exit etc.
|
// destroy all containers, clean up, notify agent to exit etc.
|
||||||
sandbox
|
sandbox.destroy().await.map_ttrpc_err(same)?;
|
||||||
.destroy()
|
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
// Close get_oom_event connection,
|
// Close get_oom_event connection,
|
||||||
// otherwise it will block the shutdown of ttrpc.
|
// otherwise it will block the shutdown of ttrpc.
|
||||||
drop(sandbox.event_tx.take());
|
drop(sandbox.event_tx.take());
|
||||||
@ -1290,14 +1230,12 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
sandbox
|
sandbox
|
||||||
.sender
|
.sender
|
||||||
.take()
|
.take()
|
||||||
.ok_or_else(|| {
|
.map_ttrpc_err(
|
||||||
ttrpc_error(
|
ttrpc::Code::INTERNAL,
|
||||||
ttrpc::Code::INTERNAL,
|
"failed to get sandbox sender channel",
|
||||||
"failed to get sandbox sender channel".to_string(),
|
)?
|
||||||
)
|
|
||||||
})?
|
|
||||||
.send(1)
|
.send(1)
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
.map_ttrpc_err(same)?;
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1314,12 +1252,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.neighbors
|
.neighbors
|
||||||
.into_option()
|
.into_option()
|
||||||
.map(|n| n.ARPNeighbors)
|
.map(|n| n.ARPNeighbors)
|
||||||
.ok_or_else(|| {
|
.map_ttrpc_err(
|
||||||
ttrpc_error(
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
"empty add arp neighbours request",
|
||||||
"empty add arp neighbours request".to_string(),
|
)?;
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
self.sandbox
|
self.sandbox
|
||||||
.lock()
|
.lock()
|
||||||
@ -1327,12 +1263,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.rtnl
|
.rtnl
|
||||||
.add_arp_neighbors(neighs)
|
.add_arp_neighbors(neighs)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.map_ttrpc_err(|e| format!("Failed to add ARP neighbours: {:?}", e))?;
|
||||||
ttrpc_error(
|
|
||||||
ttrpc::Code::INTERNAL,
|
|
||||||
format!("Failed to add ARP neighbours: {:?}", e),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1346,9 +1277,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
let sandbox = self.sandbox.lock().await;
|
let sandbox = self.sandbox.lock().await;
|
||||||
|
|
||||||
sandbox
|
sandbox.online_cpu_memory(&req).map_ttrpc_err(same)?;
|
||||||
.online_cpu_memory(&req)
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1361,8 +1290,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "reseed_random_dev", req);
|
trace_rpc_call!(ctx, "reseed_random_dev", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
random::reseed_rng(req.data.as_slice())
|
random::reseed_rng(req.data.as_slice()).map_ttrpc_err(same)?;
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1378,21 +1306,16 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
info!(sl(), "get guest details!");
|
info!(sl(), "get guest details!");
|
||||||
let mut resp = GuestDetailsResponse::new();
|
let mut resp = GuestDetailsResponse::new();
|
||||||
// to get memory block size
|
// to get memory block size
|
||||||
match get_memory_info(
|
let (u, v) = get_memory_info(
|
||||||
req.mem_block_size,
|
req.mem_block_size,
|
||||||
req.mem_hotplug_probe,
|
req.mem_hotplug_probe,
|
||||||
SYSFS_MEMORY_BLOCK_SIZE_PATH,
|
SYSFS_MEMORY_BLOCK_SIZE_PATH,
|
||||||
SYSFS_MEMORY_HOTPLUG_PROBE_PATH,
|
SYSFS_MEMORY_HOTPLUG_PROBE_PATH,
|
||||||
) {
|
)
|
||||||
Ok((u, v)) => {
|
.map_ttrpc_err_do(|_| info!(sl(), "fail to get memory info!"))?;
|
||||||
resp.mem_block_size_bytes = u;
|
|
||||||
resp.support_mem_hotplug_probe = v;
|
resp.mem_block_size_bytes = u;
|
||||||
}
|
resp.support_mem_hotplug_probe = v;
|
||||||
Err(e) => {
|
|
||||||
info!(sl(), "fail to get memory info!");
|
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// to get agent details
|
// to get agent details
|
||||||
let detail = get_agent_details();
|
let detail = get_agent_details();
|
||||||
@ -1409,8 +1332,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
|
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
|
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_ttrpc_err(same)?;
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1423,8 +1345,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "set_guest_date_time", req);
|
trace_rpc_call!(ctx, "set_guest_date_time", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
do_set_guest_date_time(req.Sec, req.Usec)
|
do_set_guest_date_time(req.Sec, req.Usec).map_ttrpc_err(same)?;
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1437,7 +1358,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "copy_file", req);
|
trace_rpc_call!(ctx, "copy_file", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
do_copy_file(&req).map_ttrpc_err(same)?;
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1450,14 +1371,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "get_metrics", req);
|
trace_rpc_call!(ctx, "get_metrics", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
match get_metrics(&req) {
|
let s = get_metrics(&req).map_ttrpc_err(same)?;
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
let mut metrics = Metrics::new();
|
||||||
Ok(s) => {
|
metrics.set_metrics(s);
|
||||||
let mut metrics = Metrics::new();
|
Ok(metrics)
|
||||||
metrics.set_metrics(s);
|
|
||||||
Ok(metrics)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_oom_event(
|
async fn get_oom_event(
|
||||||
@ -1471,16 +1388,16 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
let mut event_rx = event_rx.lock().await;
|
let mut event_rx = event_rx.lock().await;
|
||||||
drop(s);
|
drop(s);
|
||||||
|
|
||||||
if let Some(container_id) = event_rx.recv().await {
|
let container_id = event_rx
|
||||||
info!(sl(), "get_oom_event return {}", &container_id);
|
.recv()
|
||||||
|
.await
|
||||||
|
.map_ttrpc_err(ttrpc::Code::INTERNAL, "")?;
|
||||||
|
|
||||||
let mut resp = OOMEvent::new();
|
info!(sl(), "get_oom_event return {}", &container_id);
|
||||||
resp.container_id = container_id;
|
|
||||||
|
|
||||||
return Ok(resp);
|
let mut resp = OOMEvent::new();
|
||||||
}
|
resp.container_id = container_id;
|
||||||
|
Ok(resp)
|
||||||
Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_volume_stats(
|
async fn get_volume_stats(
|
||||||
@ -1495,28 +1412,23 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
let mut resp = VolumeStatsResponse::new();
|
let mut resp = VolumeStatsResponse::new();
|
||||||
let mut condition = VolumeCondition::new();
|
let mut condition = VolumeCondition::new();
|
||||||
|
|
||||||
match File::open(&req.volume_guest_path) {
|
File::open(&req.volume_guest_path)
|
||||||
Ok(_) => {
|
.map_ttrpc_err_do(|_| info!(sl(), "failed to open the volume"))?;
|
||||||
condition.abnormal = false;
|
|
||||||
condition.message = String::from("OK");
|
condition.abnormal = false;
|
||||||
}
|
condition.message = String::from("OK");
|
||||||
Err(e) => {
|
|
||||||
info!(sl(), "failed to open the volume");
|
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut usage_vec = Vec::new();
|
let mut usage_vec = Vec::new();
|
||||||
|
|
||||||
// to get volume capacity stats
|
// to get volume capacity stats
|
||||||
get_volume_capacity_stats(&req.volume_guest_path)
|
get_volume_capacity_stats(&req.volume_guest_path)
|
||||||
.map(|u| usage_vec.push(u))
|
.map(|u| usage_vec.push(u))
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
.map_ttrpc_err(same)?;
|
||||||
|
|
||||||
// to get volume inode stats
|
// to get volume inode stats
|
||||||
get_volume_inode_stats(&req.volume_guest_path)
|
get_volume_inode_stats(&req.volume_guest_path)
|
||||||
.map(|u| usage_vec.push(u))
|
.map(|u| usage_vec.push(u))
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
.map_ttrpc_err(same)?;
|
||||||
|
|
||||||
resp.usage = usage_vec;
|
resp.usage = usage_vec;
|
||||||
resp.volume_condition = MessageField::some(condition);
|
resp.volume_condition = MessageField::some(condition);
|
||||||
@ -1531,9 +1443,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "add_swap", req);
|
trace_rpc_call!(ctx, "add_swap", req);
|
||||||
is_allowed(&req).await?;
|
is_allowed(&req).await?;
|
||||||
|
|
||||||
do_add_swap(&self.sandbox, &req)
|
do_add_swap(&self.sandbox, &req).await.map_ttrpc_err(same)?;
|
||||||
.await
|
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
@ -1552,7 +1462,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.await
|
.await
|
||||||
.set_policy(&req.policy)
|
.set_policy(&req.policy)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
.map_ttrpc_err(same)?;
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user