agent: simplify error handling

We extend the `Result` and `Option` types with associated types that
allows converting a `Result<T, E>` and `Option<T>` into
`ttrpc::Result<T>`.

This allows the elimination of many `match` statements in favor of
calling the map function plus the `?` operator. This transformation
simplifies the code.

Fixes: #7624

Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
Wedson Almeida Filho 2023-08-03 22:08:09 -03:00
parent e107d1d94e
commit 76dac8f22c

View File

@ -9,6 +9,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use std::ffi::{CString, OsStr}; use std::ffi::{CString, OsStr};
use std::fmt::Debug;
use std::io; use std::io;
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::path::Path; use std::path::Path;
@ -121,7 +122,7 @@ fn sl() -> slog::Logger {
} }
// Convenience function to wrap an error and response to ttrpc client // Convenience function to wrap an error and response to ttrpc client
fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error { fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
get_rpc_status(code, format!("{:?}", err)) get_rpc_status(code, format!("{:?}", err))
} }
@ -165,6 +166,36 @@ async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result
res res
} }
fn same<E>(e: E) -> E {
e
}
trait ResultToTtrpcResult<T, E: Debug>: Sized {
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T>;
fn map_ttrpc_err_do(self, doer: impl FnOnce(&E)) -> ttrpc::Result<T> {
self.map_ttrpc_err(|e| {
doer(&e);
e
})
}
}
impl<T, E: Debug> ResultToTtrpcResult<T, E> for Result<T, E> {
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T> {
self.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, msg_builder(e)))
}
}
trait OptionToTtrpcResult<T>: Sized {
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T>;
}
impl<T> OptionToTtrpcResult<T> for Option<T> {
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T> {
self.ok_or_else(|| ttrpc_error(code, msg))
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AgentService { pub struct AgentService {
sandbox: Arc<Mutex<Sandbox>>, sandbox: Arc<Mutex<Sandbox>>,
@ -642,10 +673,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "create_container", req); trace_rpc_call!(ctx, "create_container", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match self.do_create_container(req).await { self.do_create_container(req).await.map_ttrpc_err(same)?;
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(Empty::new())
Ok(_) => Ok(Empty::new()),
}
} }
async fn start_container( async fn start_container(
@ -655,10 +684,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "start_container", req); trace_rpc_call!(ctx, "start_container", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match self.do_start_container(req).await { self.do_start_container(req).await.map_ttrpc_err(same)?;
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(Empty::new())
Ok(_) => Ok(Empty::new()),
}
} }
async fn remove_container( async fn remove_container(
@ -668,10 +695,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_container", req); trace_rpc_call!(ctx, "remove_container", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match self.do_remove_container(req).await { self.do_remove_container(req).await.map_ttrpc_err(same)?;
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(Empty::new())
Ok(_) => Ok(Empty::new()),
}
} }
async fn exec_process( async fn exec_process(
@ -681,10 +706,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "exec_process", req); trace_rpc_call!(ctx, "exec_process", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match self.do_exec_process(req).await { self.do_exec_process(req).await.map_ttrpc_err(same)?;
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(Empty::new())
Ok(_) => Ok(Empty::new()),
}
} }
async fn signal_process( async fn signal_process(
@ -694,10 +717,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "signal_process", req); trace_rpc_call!(ctx, "signal_process", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match self.do_signal_process(req).await { self.do_signal_process(req).await.map_ttrpc_err(same)?;
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(Empty::new())
Ok(_) => Ok(Empty::new()),
}
} }
async fn wait_process( async fn wait_process(
@ -707,9 +728,7 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<WaitProcessResponse> { ) -> ttrpc::Result<WaitProcessResponse> {
trace_rpc_call!(ctx, "wait_process", req); trace_rpc_call!(ctx, "wait_process", req);
is_allowed(&req).await?; is_allowed(&req).await?;
self.do_wait_process(req) self.do_wait_process(req).await.map_ttrpc_err(same)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
} }
async fn update_container( async fn update_container(
@ -721,18 +740,12 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?; is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| { let ctr = sandbox
ttrpc_error( .get_container(&req.container_id)
ttrpc::Code::INVALID_ARGUMENT, .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
"invalid container id".to_string(),
)
})?;
if let Some(res) = req.resources.as_ref() { if let Some(res) = req.resources.as_ref() {
let oci_res = rustjail::resources_grpc_to_oci(res); let oci_res = rustjail::resources_grpc_to_oci(res);
if let Err(e) = ctr.set(oci_res) { ctr.set(oci_res).map_ttrpc_err(same)?;
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
} }
Ok(Empty::new()) Ok(Empty::new())
@ -747,15 +760,10 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?; is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| { let ctr = sandbox
ttrpc_error( .get_container(&req.container_id)
ttrpc::Code::INVALID_ARGUMENT, .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
"invalid container id".to_string(), ctr.stats().map_ttrpc_err(same)
)
})?;
ctr.stats()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
} }
async fn pause_container( async fn pause_container(
@ -767,16 +775,10 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?; is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| { let ctr = sandbox
ttrpc_error( .get_container(&req.container_id)
ttrpc::Code::INVALID_ARGUMENT, .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
"invalid container id".to_string(), ctr.pause().map_ttrpc_err(same)?;
)
})?;
ctr.pause()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -789,16 +791,10 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?; is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| { let ctr = sandbox
ttrpc_error( .get_container(&req.container_id)
ttrpc::Code::INVALID_ARGUMENT, .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
"invalid container id".to_string(), ctr.resume().map_ttrpc_err(same)?;
)
})?;
ctr.resume()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -809,8 +805,7 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req); trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
is_allowed(&req).await?; is_allowed(&req).await?;
let mount_infos = parse_mount_table("/proc/self/mountinfo") let mount_infos = parse_mount_table("/proc/self/mountinfo").map_ttrpc_err(same)?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
for m in &mount_infos { for m in &mount_infos {
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) { if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more. // stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
@ -831,9 +826,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::WriteStreamRequest, req: protocols::agent::WriteStreamRequest,
) -> ttrpc::Result<WriteStreamResponse> { ) -> ttrpc::Result<WriteStreamResponse> {
is_allowed(&req).await?; is_allowed(&req).await?;
self.do_write_stream(req) self.do_write_stream(req).await.map_ttrpc_err(same)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
} }
async fn read_stdout( async fn read_stdout(
@ -842,9 +835,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::ReadStreamRequest, req: protocols::agent::ReadStreamRequest,
) -> ttrpc::Result<ReadStreamResponse> { ) -> ttrpc::Result<ReadStreamResponse> {
is_allowed(&req).await?; is_allowed(&req).await?;
self.do_read_stream(req, true) self.do_read_stream(req, true).await.map_ttrpc_err(same)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
} }
async fn read_stderr( async fn read_stderr(
@ -853,9 +844,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::ReadStreamRequest, req: protocols::agent::ReadStreamRequest,
) -> ttrpc::Result<ReadStreamResponse> { ) -> ttrpc::Result<ReadStreamResponse> {
is_allowed(&req).await?; is_allowed(&req).await?;
self.do_read_stream(req, false) self.do_read_stream(req, false).await.map_ttrpc_err(same)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
} }
async fn close_stdin( async fn close_stdin(
@ -902,23 +891,20 @@ impl agent_ttrpc::AgentService for AgentService {
) )
})?; })?;
if let Some(fd) = p.term_master { let fd = p
unsafe { .term_master
let win = winsize { .map_ttrpc_err(ttrpc::Code::UNAVAILABLE, "no tty")?;
ws_row: req.row as c_ushort, let win = winsize {
ws_col: req.column as c_ushort, ws_row: req.row as c_ushort,
ws_xpixel: 0, ws_col: req.column as c_ushort,
ws_ypixel: 0, ws_xpixel: 0,
}; ws_ypixel: 0,
};
let err = libc::ioctl(fd, TIOCSWINSZ, &win); let err = unsafe { libc::ioctl(fd, TIOCSWINSZ, &win) };
Errno::result(err).map(drop).map_err(|e| { Errno::result(err)
ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) .map(drop)
})?; .map_ttrpc_err(|e| format!("ioctl error: {:?}", e))?;
}
} else {
return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
}
Ok(Empty::new()) Ok(Empty::new())
} }
@ -931,12 +917,10 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_interface", req); trace_rpc_call!(ctx, "update_interface", req);
is_allowed(&req).await?; is_allowed(&req).await?;
let interface = req.interface.into_option().ok_or_else(|| { let interface = req.interface.into_option().map_ttrpc_err(
ttrpc_error( ttrpc::Code::INVALID_ARGUMENT,
ttrpc::Code::INVALID_ARGUMENT, "empty update interface request",
"empty update interface request".to_string(), )?;
)
})?;
self.sandbox self.sandbox
.lock() .lock()
@ -944,9 +928,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl .rtnl
.update_interface(&interface) .update_interface(&interface)
.await .await
.map_err(|e| { .map_ttrpc_err(|e| format!("update interface: {:?}", e))?;
ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
})?;
Ok(interface) Ok(interface)
} }
@ -959,28 +941,25 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_routes", req); trace_rpc_call!(ctx, "update_routes", req);
is_allowed(&req).await?; is_allowed(&req).await?;
let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| { let new_routes = req
ttrpc_error( .routes
ttrpc::Code::INVALID_ARGUMENT, .into_option()
"empty update routes request".to_string(), .map(|r| r.Routes)
) .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "empty update routes request")?;
})?;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
sandbox.rtnl.update_routes(new_routes).await.map_err(|e| { sandbox
ttrpc_error( .rtnl
ttrpc::Code::INTERNAL, .update_routes(new_routes)
format!("Failed to update routes: {:?}", e), .await
) .map_ttrpc_err(|e| format!("Failed to update routes: {:?}", e))?;
})?;
let list = sandbox.rtnl.list_routes().await.map_err(|e| { let list = sandbox
ttrpc_error( .rtnl
ttrpc::Code::INTERNAL, .list_routes()
format!("Failed to list routes after update: {:?}", e), .await
) .map_ttrpc_err(|e| format!("Failed to list routes after update: {:?}", e))?;
})?;
Ok(protocols::agent::Routes { Ok(protocols::agent::Routes {
Routes: list, Routes: list,
@ -996,13 +975,10 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_mounts", req); trace_rpc_call!(ctx, "update_mounts", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match update_ephemeral_mounts(sl(), &req.storages, &self.sandbox).await { update_ephemeral_mounts(sl(), &req.storages, &self.sandbox)
Ok(_) => Ok(Empty::new()), .await
Err(e) => Err(ttrpc_error( .map_ttrpc_err(|e| format!("Failed to update mounts: {:?}", e))?;
ttrpc::Code::INTERNAL, Ok(Empty::new())
format!("Failed to update mounts: {:?}", e),
)),
}
} }
async fn get_ip_tables( async fn get_ip_tables(
@ -1032,16 +1008,13 @@ impl agent_ttrpc::AgentService for AgentService {
} }
.to_string(); .to_string();
match Command::new(cmd.clone()).output() { let output = Command::new(cmd.clone())
Ok(output) => Ok(GetIPTablesResponse { .output()
data: output.stdout, .map_ttrpc_err_do(|e| warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()))?;
..Default::default() Ok(GetIPTablesResponse {
}), data: output.stdout,
Err(e) => { ..Default::default()
warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()); })
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
}
} }
async fn set_ip_tables( async fn set_ip_tables(
@ -1071,20 +1044,14 @@ impl agent_ttrpc::AgentService for AgentService {
} }
.to_string(); .to_string();
let mut child = match Command::new(cmd.clone()) let mut child = Command::new(cmd.clone())
.arg("--wait") .arg("--wait")
.arg(IPTABLES_RESTORE_WAIT_SEC.to_string()) .arg(IPTABLES_RESTORE_WAIT_SEC.to_string())
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.spawn() .spawn()
{ .map_ttrpc_err_do(|e| warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()))?;
Ok(child) => child,
Err(e) => {
warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind());
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
let mut stdin = match child.stdin.take() { let mut stdin = match child.stdin.take() {
Some(si) => si, Some(si) => si,
@ -1092,7 +1059,7 @@ impl agent_ttrpc::AgentService for AgentService {
println!("failed to get stdin from child"); println!("failed to get stdin from child");
return Err(ttrpc_error( return Err(ttrpc_error(
ttrpc::Code::INTERNAL, ttrpc::Code::INTERNAL,
"failed to take stdin from child".to_string(), "failed to take stdin from child",
)); ));
} }
}; };
@ -1111,35 +1078,22 @@ impl agent_ttrpc::AgentService for AgentService {
}; };
}); });
if tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx) let _ = tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
.await .await
.is_err() .map_ttrpc_err(|_| "timeout waiting for stdin writer to complete")?;
{
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"timeout waiting for stdin writer to complete".to_string(),
));
}
if handle.await.is_err() { handle
return Err(ttrpc_error( .await
ttrpc::Code::INTERNAL, .map_ttrpc_err(|_| "stdin writer thread failure")?;
"stdin writer thread failure".to_string(),
));
}
let output = match child.wait_with_output() { let output = child.wait_with_output().map_ttrpc_err_do(|e| {
Ok(o) => o, warn!(
Err(e) => { sl(),
warn!( "failure waiting for spawned {} to complete: {:?}",
sl(), cmd,
"failure waiting for spawned {} to complete: {:?}", e.kind()
cmd, )
e.kind() })?;
);
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
if !output.status.success() { if !output.status.success() {
warn!(sl(), "{} failed: {:?}", cmd, output.stderr); warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
@ -1174,12 +1128,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl .rtnl
.list_interfaces() .list_interfaces()
.await .await
.map_err(|e| { .map_ttrpc_err(|e| format!("Failed to list interfaces: {:?}", e))?;
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to list interfaces: {:?}", e),
)
})?;
Ok(protocols::agent::Interfaces { Ok(protocols::agent::Interfaces {
Interfaces: list, Interfaces: list,
@ -1202,7 +1151,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl .rtnl
.list_routes() .list_routes()
.await .await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; .map_ttrpc_err(|e| format!("list routes: {:?}", e))?;
Ok(protocols::agent::Routes { Ok(protocols::agent::Routes {
Routes: list, Routes: list,
@ -1241,30 +1190,24 @@ impl agent_ttrpc::AgentService for AgentService {
} }
for m in req.kernel_modules.iter() { for m in req.kernel_modules.iter() {
load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; load_kernel_module(m).map_ttrpc_err(same)?;
} }
s.setup_shared_namespaces() s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
} }
match add_storages(sl(), &req.storages, &self.sandbox, None).await { let m = add_storages(sl(), &req.storages, &self.sandbox, None)
Ok(m) => { .await
self.sandbox.lock().await.mounts = m; .map_ttrpc_err(same)?;
} self.sandbox.lock().await.mounts = m;
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
match setup_guest_dns(sl(), &req.dns) { setup_guest_dns(sl(), &req.dns).map_ttrpc_err(same)?;
Ok(_) => { {
let mut s = self.sandbox.lock().await; let mut s = self.sandbox.lock().await;
for dns in req.dns { for dns in req.dns {
s.network.set_dns(dns); s.network.set_dns(dns);
}
} }
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }
};
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1279,10 +1222,7 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
// destroy all containers, clean up, notify agent to exit etc. // destroy all containers, clean up, notify agent to exit etc.
sandbox sandbox.destroy().await.map_ttrpc_err(same)?;
.destroy()
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
// Close get_oom_event connection, // Close get_oom_event connection,
// otherwise it will block the shutdown of ttrpc. // otherwise it will block the shutdown of ttrpc.
drop(sandbox.event_tx.take()); drop(sandbox.event_tx.take());
@ -1290,14 +1230,12 @@ impl agent_ttrpc::AgentService for AgentService {
sandbox sandbox
.sender .sender
.take() .take()
.ok_or_else(|| { .map_ttrpc_err(
ttrpc_error( ttrpc::Code::INTERNAL,
ttrpc::Code::INTERNAL, "failed to get sandbox sender channel",
"failed to get sandbox sender channel".to_string(), )?
)
})?
.send(1) .send(1)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; .map_ttrpc_err(same)?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1314,12 +1252,10 @@ impl agent_ttrpc::AgentService for AgentService {
.neighbors .neighbors
.into_option() .into_option()
.map(|n| n.ARPNeighbors) .map(|n| n.ARPNeighbors)
.ok_or_else(|| { .map_ttrpc_err(
ttrpc_error( ttrpc::Code::INVALID_ARGUMENT,
ttrpc::Code::INVALID_ARGUMENT, "empty add arp neighbours request",
"empty add arp neighbours request".to_string(), )?;
)
})?;
self.sandbox self.sandbox
.lock() .lock()
@ -1327,12 +1263,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl .rtnl
.add_arp_neighbors(neighs) .add_arp_neighbors(neighs)
.await .await
.map_err(|e| { .map_ttrpc_err(|e| format!("Failed to add ARP neighbours: {:?}", e))?;
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to add ARP neighbours: {:?}", e),
)
})?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1346,9 +1277,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?; is_allowed(&req).await?;
let sandbox = self.sandbox.lock().await; let sandbox = self.sandbox.lock().await;
sandbox sandbox.online_cpu_memory(&req).map_ttrpc_err(same)?;
.online_cpu_memory(&req)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1361,8 +1290,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "reseed_random_dev", req); trace_rpc_call!(ctx, "reseed_random_dev", req);
is_allowed(&req).await?; is_allowed(&req).await?;
random::reseed_rng(req.data.as_slice()) random::reseed_rng(req.data.as_slice()).map_ttrpc_err(same)?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1378,21 +1306,16 @@ impl agent_ttrpc::AgentService for AgentService {
info!(sl(), "get guest details!"); info!(sl(), "get guest details!");
let mut resp = GuestDetailsResponse::new(); let mut resp = GuestDetailsResponse::new();
// to get memory block size // to get memory block size
match get_memory_info( let (u, v) = get_memory_info(
req.mem_block_size, req.mem_block_size,
req.mem_hotplug_probe, req.mem_hotplug_probe,
SYSFS_MEMORY_BLOCK_SIZE_PATH, SYSFS_MEMORY_BLOCK_SIZE_PATH,
SYSFS_MEMORY_HOTPLUG_PROBE_PATH, SYSFS_MEMORY_HOTPLUG_PROBE_PATH,
) { )
Ok((u, v)) => { .map_ttrpc_err_do(|_| info!(sl(), "fail to get memory info!"))?;
resp.mem_block_size_bytes = u;
resp.support_mem_hotplug_probe = v; resp.mem_block_size_bytes = u;
} resp.support_mem_hotplug_probe = v;
Err(e) => {
info!(sl(), "fail to get memory info!");
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
}
// to get agent details // to get agent details
let detail = get_agent_details(); let detail = get_agent_details();
@ -1409,8 +1332,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req); trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
is_allowed(&req).await?; is_allowed(&req).await?;
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_ttrpc_err(same)?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1423,8 +1345,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "set_guest_date_time", req); trace_rpc_call!(ctx, "set_guest_date_time", req);
is_allowed(&req).await?; is_allowed(&req).await?;
do_set_guest_date_time(req.Sec, req.Usec) do_set_guest_date_time(req.Sec, req.Usec).map_ttrpc_err(same)?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1437,7 +1358,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "copy_file", req); trace_rpc_call!(ctx, "copy_file", req);
is_allowed(&req).await?; is_allowed(&req).await?;
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; do_copy_file(&req).map_ttrpc_err(same)?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1450,14 +1371,10 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "get_metrics", req); trace_rpc_call!(ctx, "get_metrics", req);
is_allowed(&req).await?; is_allowed(&req).await?;
match get_metrics(&req) { let s = get_metrics(&req).map_ttrpc_err(same)?;
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), let mut metrics = Metrics::new();
Ok(s) => { metrics.set_metrics(s);
let mut metrics = Metrics::new(); Ok(metrics)
metrics.set_metrics(s);
Ok(metrics)
}
}
} }
async fn get_oom_event( async fn get_oom_event(
@ -1471,16 +1388,16 @@ impl agent_ttrpc::AgentService for AgentService {
let mut event_rx = event_rx.lock().await; let mut event_rx = event_rx.lock().await;
drop(s); drop(s);
if let Some(container_id) = event_rx.recv().await { let container_id = event_rx
info!(sl(), "get_oom_event return {}", &container_id); .recv()
.await
.map_ttrpc_err(ttrpc::Code::INTERNAL, "")?;
let mut resp = OOMEvent::new(); info!(sl(), "get_oom_event return {}", &container_id);
resp.container_id = container_id;
return Ok(resp); let mut resp = OOMEvent::new();
} resp.container_id = container_id;
Ok(resp)
Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
} }
async fn get_volume_stats( async fn get_volume_stats(
@ -1495,28 +1412,23 @@ impl agent_ttrpc::AgentService for AgentService {
let mut resp = VolumeStatsResponse::new(); let mut resp = VolumeStatsResponse::new();
let mut condition = VolumeCondition::new(); let mut condition = VolumeCondition::new();
match File::open(&req.volume_guest_path) { File::open(&req.volume_guest_path)
Ok(_) => { .map_ttrpc_err_do(|_| info!(sl(), "failed to open the volume"))?;
condition.abnormal = false;
condition.message = String::from("OK"); condition.abnormal = false;
} condition.message = String::from("OK");
Err(e) => {
info!(sl(), "failed to open the volume");
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
let mut usage_vec = Vec::new(); let mut usage_vec = Vec::new();
// to get volume capacity stats // to get volume capacity stats
get_volume_capacity_stats(&req.volume_guest_path) get_volume_capacity_stats(&req.volume_guest_path)
.map(|u| usage_vec.push(u)) .map(|u| usage_vec.push(u))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; .map_ttrpc_err(same)?;
// to get volume inode stats // to get volume inode stats
get_volume_inode_stats(&req.volume_guest_path) get_volume_inode_stats(&req.volume_guest_path)
.map(|u| usage_vec.push(u)) .map(|u| usage_vec.push(u))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; .map_ttrpc_err(same)?;
resp.usage = usage_vec; resp.usage = usage_vec;
resp.volume_condition = MessageField::some(condition); resp.volume_condition = MessageField::some(condition);
@ -1531,9 +1443,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "add_swap", req); trace_rpc_call!(ctx, "add_swap", req);
is_allowed(&req).await?; is_allowed(&req).await?;
do_add_swap(&self.sandbox, &req) do_add_swap(&self.sandbox, &req).await.map_ttrpc_err(same)?;
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new()) Ok(Empty::new())
} }
@ -1552,7 +1462,7 @@ impl agent_ttrpc::AgentService for AgentService {
.await .await
.set_policy(&req.policy) .set_policy(&req.policy)
.await .await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; .map_ttrpc_err(same)?;
Ok(Empty::new()) Ok(Empty::new())
} }