agent: simplify error handling

We extend the `Result` and `Option` types with associated types that
allows converting a `Result<T, E>` and `Option<T>` into
`ttrpc::Result<T>`.

This allows the elimination of many `match` statements in favor of
calling the map function plus the `?` operator. This transformation
simplifies the code.

Fixes: #7624

Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
Wedson Almeida Filho 2023-08-03 22:08:09 -03:00
parent e107d1d94e
commit 76dac8f22c

View File

@ -9,6 +9,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
use tokio::sync::Mutex;
use std::ffi::{CString, OsStr};
use std::fmt::Debug;
use std::io;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
@ -121,7 +122,7 @@ fn sl() -> slog::Logger {
}
// Convenience function to wrap an error and response to ttrpc client
fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error {
fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
get_rpc_status(code, format!("{:?}", err))
}
@ -165,6 +166,36 @@ async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result
res
}
fn same<E>(e: E) -> E {
e
}
trait ResultToTtrpcResult<T, E: Debug>: Sized {
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T>;
fn map_ttrpc_err_do(self, doer: impl FnOnce(&E)) -> ttrpc::Result<T> {
self.map_ttrpc_err(|e| {
doer(&e);
e
})
}
}
impl<T, E: Debug> ResultToTtrpcResult<T, E> for Result<T, E> {
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T> {
self.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, msg_builder(e)))
}
}
trait OptionToTtrpcResult<T>: Sized {
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T>;
}
impl<T> OptionToTtrpcResult<T> for Option<T> {
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T> {
self.ok_or_else(|| ttrpc_error(code, msg))
}
}
#[derive(Clone, Debug)]
pub struct AgentService {
sandbox: Arc<Mutex<Sandbox>>,
@ -642,10 +673,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "create_container", req);
is_allowed(&req).await?;
match self.do_create_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
self.do_create_container(req).await.map_ttrpc_err(same)?;
Ok(Empty::new())
}
async fn start_container(
@ -655,10 +684,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "start_container", req);
is_allowed(&req).await?;
match self.do_start_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
self.do_start_container(req).await.map_ttrpc_err(same)?;
Ok(Empty::new())
}
async fn remove_container(
@ -668,10 +695,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_container", req);
is_allowed(&req).await?;
match self.do_remove_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
self.do_remove_container(req).await.map_ttrpc_err(same)?;
Ok(Empty::new())
}
async fn exec_process(
@ -681,10 +706,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "exec_process", req);
is_allowed(&req).await?;
match self.do_exec_process(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
self.do_exec_process(req).await.map_ttrpc_err(same)?;
Ok(Empty::new())
}
async fn signal_process(
@ -694,10 +717,8 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "signal_process", req);
is_allowed(&req).await?;
match self.do_signal_process(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
self.do_signal_process(req).await.map_ttrpc_err(same)?;
Ok(Empty::new())
}
async fn wait_process(
@ -707,9 +728,7 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<WaitProcessResponse> {
trace_rpc_call!(ctx, "wait_process", req);
is_allowed(&req).await?;
self.do_wait_process(req)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
self.do_wait_process(req).await.map_ttrpc_err(same)
}
async fn update_container(
@ -721,18 +740,12 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
let ctr = sandbox
.get_container(&req.container_id)
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
if let Some(res) = req.resources.as_ref() {
let oci_res = rustjail::resources_grpc_to_oci(res);
if let Err(e) = ctr.set(oci_res) {
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
ctr.set(oci_res).map_ttrpc_err(same)?;
}
Ok(Empty::new())
@ -747,15 +760,10 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
ctr.stats()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
let ctr = sandbox
.get_container(&req.container_id)
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
ctr.stats().map_ttrpc_err(same)
}
async fn pause_container(
@ -767,16 +775,10 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
ctr.pause()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
let ctr = sandbox
.get_container(&req.container_id)
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
ctr.pause().map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -789,16 +791,10 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
ctr.resume()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
let ctr = sandbox
.get_container(&req.container_id)
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
ctr.resume().map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -809,8 +805,7 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
is_allowed(&req).await?;
let mount_infos = parse_mount_table("/proc/self/mountinfo")
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
let mount_infos = parse_mount_table("/proc/self/mountinfo").map_ttrpc_err(same)?;
for m in &mount_infos {
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
@ -831,9 +826,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::WriteStreamRequest,
) -> ttrpc::Result<WriteStreamResponse> {
is_allowed(&req).await?;
self.do_write_stream(req)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
self.do_write_stream(req).await.map_ttrpc_err(same)
}
async fn read_stdout(
@ -842,9 +835,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::ReadStreamRequest,
) -> ttrpc::Result<ReadStreamResponse> {
is_allowed(&req).await?;
self.do_read_stream(req, true)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
self.do_read_stream(req, true).await.map_ttrpc_err(same)
}
async fn read_stderr(
@ -853,9 +844,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::ReadStreamRequest,
) -> ttrpc::Result<ReadStreamResponse> {
is_allowed(&req).await?;
self.do_read_stream(req, false)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
self.do_read_stream(req, false).await.map_ttrpc_err(same)
}
async fn close_stdin(
@ -902,23 +891,20 @@ impl agent_ttrpc::AgentService for AgentService {
)
})?;
if let Some(fd) = p.term_master {
unsafe {
let win = winsize {
ws_row: req.row as c_ushort,
ws_col: req.column as c_ushort,
ws_xpixel: 0,
ws_ypixel: 0,
};
let fd = p
.term_master
.map_ttrpc_err(ttrpc::Code::UNAVAILABLE, "no tty")?;
let win = winsize {
ws_row: req.row as c_ushort,
ws_col: req.column as c_ushort,
ws_xpixel: 0,
ws_ypixel: 0,
};
let err = libc::ioctl(fd, TIOCSWINSZ, &win);
Errno::result(err).map(drop).map_err(|e| {
ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e))
})?;
}
} else {
return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
}
let err = unsafe { libc::ioctl(fd, TIOCSWINSZ, &win) };
Errno::result(err)
.map(drop)
.map_ttrpc_err(|e| format!("ioctl error: {:?}", e))?;
Ok(Empty::new())
}
@ -931,12 +917,10 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_interface", req);
is_allowed(&req).await?;
let interface = req.interface.into_option().ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"empty update interface request".to_string(),
)
})?;
let interface = req.interface.into_option().map_ttrpc_err(
ttrpc::Code::INVALID_ARGUMENT,
"empty update interface request",
)?;
self.sandbox
.lock()
@ -944,9 +928,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl
.update_interface(&interface)
.await
.map_err(|e| {
ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
})?;
.map_ttrpc_err(|e| format!("update interface: {:?}", e))?;
Ok(interface)
}
@ -959,28 +941,25 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_routes", req);
is_allowed(&req).await?;
let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"empty update routes request".to_string(),
)
})?;
let new_routes = req
.routes
.into_option()
.map(|r| r.Routes)
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "empty update routes request")?;
let mut sandbox = self.sandbox.lock().await;
sandbox.rtnl.update_routes(new_routes).await.map_err(|e| {
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to update routes: {:?}", e),
)
})?;
sandbox
.rtnl
.update_routes(new_routes)
.await
.map_ttrpc_err(|e| format!("Failed to update routes: {:?}", e))?;
let list = sandbox.rtnl.list_routes().await.map_err(|e| {
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to list routes after update: {:?}", e),
)
})?;
let list = sandbox
.rtnl
.list_routes()
.await
.map_ttrpc_err(|e| format!("Failed to list routes after update: {:?}", e))?;
Ok(protocols::agent::Routes {
Routes: list,
@ -996,13 +975,10 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_mounts", req);
is_allowed(&req).await?;
match update_ephemeral_mounts(sl(), &req.storages, &self.sandbox).await {
Ok(_) => Ok(Empty::new()),
Err(e) => Err(ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to update mounts: {:?}", e),
)),
}
update_ephemeral_mounts(sl(), &req.storages, &self.sandbox)
.await
.map_ttrpc_err(|e| format!("Failed to update mounts: {:?}", e))?;
Ok(Empty::new())
}
async fn get_ip_tables(
@ -1032,16 +1008,13 @@ impl agent_ttrpc::AgentService for AgentService {
}
.to_string();
match Command::new(cmd.clone()).output() {
Ok(output) => Ok(GetIPTablesResponse {
data: output.stdout,
..Default::default()
}),
Err(e) => {
warn!(sl(), "failed to run {}: {:?}", cmd, e.kind());
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
}
let output = Command::new(cmd.clone())
.output()
.map_ttrpc_err_do(|e| warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()))?;
Ok(GetIPTablesResponse {
data: output.stdout,
..Default::default()
})
}
async fn set_ip_tables(
@ -1071,20 +1044,14 @@ impl agent_ttrpc::AgentService for AgentService {
}
.to_string();
let mut child = match Command::new(cmd.clone())
let mut child = Command::new(cmd.clone())
.arg("--wait")
.arg(IPTABLES_RESTORE_WAIT_SEC.to_string())
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(child) => child,
Err(e) => {
warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind());
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
.map_ttrpc_err_do(|e| warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()))?;
let mut stdin = match child.stdin.take() {
Some(si) => si,
@ -1092,7 +1059,7 @@ impl agent_ttrpc::AgentService for AgentService {
println!("failed to get stdin from child");
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"failed to take stdin from child".to_string(),
"failed to take stdin from child",
));
}
};
@ -1111,35 +1078,22 @@ impl agent_ttrpc::AgentService for AgentService {
};
});
if tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
let _ = tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
.await
.is_err()
{
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"timeout waiting for stdin writer to complete".to_string(),
));
}
.map_ttrpc_err(|_| "timeout waiting for stdin writer to complete")?;
if handle.await.is_err() {
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"stdin writer thread failure".to_string(),
));
}
handle
.await
.map_ttrpc_err(|_| "stdin writer thread failure")?;
let output = match child.wait_with_output() {
Ok(o) => o,
Err(e) => {
warn!(
sl(),
"failure waiting for spawned {} to complete: {:?}",
cmd,
e.kind()
);
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
let output = child.wait_with_output().map_ttrpc_err_do(|e| {
warn!(
sl(),
"failure waiting for spawned {} to complete: {:?}",
cmd,
e.kind()
)
})?;
if !output.status.success() {
warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
@ -1174,12 +1128,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl
.list_interfaces()
.await
.map_err(|e| {
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to list interfaces: {:?}", e),
)
})?;
.map_ttrpc_err(|e| format!("Failed to list interfaces: {:?}", e))?;
Ok(protocols::agent::Interfaces {
Interfaces: list,
@ -1202,7 +1151,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl
.list_routes()
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
.map_ttrpc_err(|e| format!("list routes: {:?}", e))?;
Ok(protocols::agent::Routes {
Routes: list,
@ -1241,30 +1190,24 @@ impl agent_ttrpc::AgentService for AgentService {
}
for m in req.kernel_modules.iter() {
load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
load_kernel_module(m).map_ttrpc_err(same)?;
}
s.setup_shared_namespaces()
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
}
match add_storages(sl(), &req.storages, &self.sandbox, None).await {
Ok(m) => {
self.sandbox.lock().await.mounts = m;
}
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
let m = add_storages(sl(), &req.storages, &self.sandbox, None)
.await
.map_ttrpc_err(same)?;
self.sandbox.lock().await.mounts = m;
match setup_guest_dns(sl(), &req.dns) {
Ok(_) => {
let mut s = self.sandbox.lock().await;
for dns in req.dns {
s.network.set_dns(dns);
}
setup_guest_dns(sl(), &req.dns).map_ttrpc_err(same)?;
{
let mut s = self.sandbox.lock().await;
for dns in req.dns {
s.network.set_dns(dns);
}
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
}
Ok(Empty::new())
}
@ -1279,10 +1222,7 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = self.sandbox.lock().await;
// destroy all containers, clean up, notify agent to exit etc.
sandbox
.destroy()
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
sandbox.destroy().await.map_ttrpc_err(same)?;
// Close get_oom_event connection,
// otherwise it will block the shutdown of ttrpc.
drop(sandbox.event_tx.take());
@ -1290,14 +1230,12 @@ impl agent_ttrpc::AgentService for AgentService {
sandbox
.sender
.take()
.ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INTERNAL,
"failed to get sandbox sender channel".to_string(),
)
})?
.map_ttrpc_err(
ttrpc::Code::INTERNAL,
"failed to get sandbox sender channel",
)?
.send(1)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
.map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1314,12 +1252,10 @@ impl agent_ttrpc::AgentService for AgentService {
.neighbors
.into_option()
.map(|n| n.ARPNeighbors)
.ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"empty add arp neighbours request".to_string(),
)
})?;
.map_ttrpc_err(
ttrpc::Code::INVALID_ARGUMENT,
"empty add arp neighbours request",
)?;
self.sandbox
.lock()
@ -1327,12 +1263,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl
.add_arp_neighbors(neighs)
.await
.map_err(|e| {
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to add ARP neighbours: {:?}", e),
)
})?;
.map_ttrpc_err(|e| format!("Failed to add ARP neighbours: {:?}", e))?;
Ok(Empty::new())
}
@ -1346,9 +1277,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req).await?;
let sandbox = self.sandbox.lock().await;
sandbox
.online_cpu_memory(&req)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
sandbox.online_cpu_memory(&req).map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1361,8 +1290,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "reseed_random_dev", req);
is_allowed(&req).await?;
random::reseed_rng(req.data.as_slice())
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
random::reseed_rng(req.data.as_slice()).map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1378,21 +1306,16 @@ impl agent_ttrpc::AgentService for AgentService {
info!(sl(), "get guest details!");
let mut resp = GuestDetailsResponse::new();
// to get memory block size
match get_memory_info(
let (u, v) = get_memory_info(
req.mem_block_size,
req.mem_hotplug_probe,
SYSFS_MEMORY_BLOCK_SIZE_PATH,
SYSFS_MEMORY_HOTPLUG_PROBE_PATH,
) {
Ok((u, v)) => {
resp.mem_block_size_bytes = u;
resp.support_mem_hotplug_probe = v;
}
Err(e) => {
info!(sl(), "fail to get memory info!");
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
}
)
.map_ttrpc_err_do(|_| info!(sl(), "fail to get memory info!"))?;
resp.mem_block_size_bytes = u;
resp.support_mem_hotplug_probe = v;
// to get agent details
let detail = get_agent_details();
@ -1409,8 +1332,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
is_allowed(&req).await?;
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1423,8 +1345,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "set_guest_date_time", req);
is_allowed(&req).await?;
do_set_guest_date_time(req.Sec, req.Usec)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
do_set_guest_date_time(req.Sec, req.Usec).map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1437,7 +1358,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "copy_file", req);
is_allowed(&req).await?;
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
do_copy_file(&req).map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1450,14 +1371,10 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "get_metrics", req);
is_allowed(&req).await?;
match get_metrics(&req) {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(s) => {
let mut metrics = Metrics::new();
metrics.set_metrics(s);
Ok(metrics)
}
}
let s = get_metrics(&req).map_ttrpc_err(same)?;
let mut metrics = Metrics::new();
metrics.set_metrics(s);
Ok(metrics)
}
async fn get_oom_event(
@ -1471,16 +1388,16 @@ impl agent_ttrpc::AgentService for AgentService {
let mut event_rx = event_rx.lock().await;
drop(s);
if let Some(container_id) = event_rx.recv().await {
info!(sl(), "get_oom_event return {}", &container_id);
let container_id = event_rx
.recv()
.await
.map_ttrpc_err(ttrpc::Code::INTERNAL, "")?;
let mut resp = OOMEvent::new();
resp.container_id = container_id;
info!(sl(), "get_oom_event return {}", &container_id);
return Ok(resp);
}
Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
let mut resp = OOMEvent::new();
resp.container_id = container_id;
Ok(resp)
}
async fn get_volume_stats(
@ -1495,28 +1412,23 @@ impl agent_ttrpc::AgentService for AgentService {
let mut resp = VolumeStatsResponse::new();
let mut condition = VolumeCondition::new();
match File::open(&req.volume_guest_path) {
Ok(_) => {
condition.abnormal = false;
condition.message = String::from("OK");
}
Err(e) => {
info!(sl(), "failed to open the volume");
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
File::open(&req.volume_guest_path)
.map_ttrpc_err_do(|_| info!(sl(), "failed to open the volume"))?;
condition.abnormal = false;
condition.message = String::from("OK");
let mut usage_vec = Vec::new();
// to get volume capacity stats
get_volume_capacity_stats(&req.volume_guest_path)
.map(|u| usage_vec.push(u))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
.map_ttrpc_err(same)?;
// to get volume inode stats
get_volume_inode_stats(&req.volume_guest_path)
.map(|u| usage_vec.push(u))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
.map_ttrpc_err(same)?;
resp.usage = usage_vec;
resp.volume_condition = MessageField::some(condition);
@ -1531,9 +1443,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "add_swap", req);
is_allowed(&req).await?;
do_add_swap(&self.sandbox, &req)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
do_add_swap(&self.sandbox, &req).await.map_ttrpc_err(same)?;
Ok(Empty::new())
}
@ -1552,7 +1462,7 @@ impl agent_ttrpc::AgentService for AgentService {
.await
.set_policy(&req.policy)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
.map_ttrpc_err(same)?;
Ok(Empty::new())
}