From 76dac8f22c86e4f40167149fb19c415dd629c0c0 Mon Sep 17 00:00:00 2001 From: Wedson Almeida Filho Date: Thu, 3 Aug 2023 22:08:09 -0300 Subject: [PATCH] agent: simplify error handling We extend the `Result` and `Option` types with associated types that allows converting a `Result` and `Option` into `ttrpc::Result`. This allows the elimination of many `match` statements in favor of calling the map function plus the `?` operator. This transformation simplifies the code. Fixes: #7624 Signed-off-by: Wedson Almeida Filho --- src/agent/src/rpc.rs | 450 +++++++++++++++++-------------------------- 1 file changed, 180 insertions(+), 270 deletions(-) diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 301daf55d5..e1b15d28c6 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -9,6 +9,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf}; use tokio::sync::Mutex; use std::ffi::{CString, OsStr}; +use std::fmt::Debug; use std::io; use std::os::unix::ffi::OsStrExt; use std::path::Path; @@ -121,7 +122,7 @@ fn sl() -> slog::Logger { } // Convenience function to wrap an error and response to ttrpc client -fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error { +fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error { get_rpc_status(code, format!("{:?}", err)) } @@ -165,6 +166,36 @@ async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result res } +fn same(e: E) -> E { + e +} + +trait ResultToTtrpcResult: Sized { + fn map_ttrpc_err(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result; + fn map_ttrpc_err_do(self, doer: impl FnOnce(&E)) -> ttrpc::Result { + self.map_ttrpc_err(|e| { + doer(&e); + e + }) + } +} + +impl ResultToTtrpcResult for Result { + fn map_ttrpc_err(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result { + self.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, msg_builder(e))) + } +} + +trait OptionToTtrpcResult: Sized { + fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result; +} + +impl OptionToTtrpcResult for Option { + fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result { + self.ok_or_else(|| ttrpc_error(code, msg)) + } +} + #[derive(Clone, Debug)] pub struct AgentService { sandbox: Arc>, @@ -642,10 +673,8 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "create_container", req); is_allowed(&req).await?; - match self.do_create_container(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - Ok(_) => Ok(Empty::new()), - } + self.do_create_container(req).await.map_ttrpc_err(same)?; + Ok(Empty::new()) } async fn start_container( @@ -655,10 +684,8 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "start_container", req); is_allowed(&req).await?; - match self.do_start_container(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - Ok(_) => Ok(Empty::new()), - } + self.do_start_container(req).await.map_ttrpc_err(same)?; + Ok(Empty::new()) } async fn remove_container( @@ -668,10 +695,8 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_container", req); is_allowed(&req).await?; - match self.do_remove_container(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - Ok(_) => Ok(Empty::new()), - } + self.do_remove_container(req).await.map_ttrpc_err(same)?; + Ok(Empty::new()) } async fn exec_process( @@ -681,10 +706,8 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "exec_process", req); is_allowed(&req).await?; - match self.do_exec_process(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - Ok(_) => Ok(Empty::new()), - } + self.do_exec_process(req).await.map_ttrpc_err(same)?; + Ok(Empty::new()) } async fn signal_process( @@ -694,10 +717,8 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "signal_process", req); is_allowed(&req).await?; - match self.do_signal_process(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - Ok(_) => Ok(Empty::new()), - } + self.do_signal_process(req).await.map_ttrpc_err(same)?; + Ok(Empty::new()) } async fn wait_process( @@ -707,9 +728,7 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "wait_process", req); is_allowed(&req).await?; - self.do_wait_process(req) - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) + self.do_wait_process(req).await.map_ttrpc_err(same) } async fn update_container( @@ -721,18 +740,12 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req).await?; let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - ) - })?; - + let ctr = sandbox + .get_container(&req.container_id) + .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?; if let Some(res) = req.resources.as_ref() { let oci_res = rustjail::resources_grpc_to_oci(res); - if let Err(e) = ctr.set(oci_res) { - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } + ctr.set(oci_res).map_ttrpc_err(same)?; } Ok(Empty::new()) @@ -747,15 +760,10 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req).await?; let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - ) - })?; - - ctr.stats() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) + let ctr = sandbox + .get_container(&req.container_id) + .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?; + ctr.stats().map_ttrpc_err(same) } async fn pause_container( @@ -767,16 +775,10 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req).await?; let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - ) - })?; - - ctr.pause() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; - + let ctr = sandbox + .get_container(&req.container_id) + .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?; + ctr.pause().map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -789,16 +791,10 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req).await?; let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - ) - })?; - - ctr.resume() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; - + let ctr = sandbox + .get_container(&req.container_id) + .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?; + ctr.resume().map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -809,8 +805,7 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req); is_allowed(&req).await?; - let mount_infos = parse_mount_table("/proc/self/mountinfo") - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + let mount_infos = parse_mount_table("/proc/self/mountinfo").map_ttrpc_err(same)?; for m in &mount_infos { if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) { // stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more. @@ -831,9 +826,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::WriteStreamRequest, ) -> ttrpc::Result { is_allowed(&req).await?; - self.do_write_stream(req) - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) + self.do_write_stream(req).await.map_ttrpc_err(same) } async fn read_stdout( @@ -842,9 +835,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { is_allowed(&req).await?; - self.do_read_stream(req, true) - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) + self.do_read_stream(req, true).await.map_ttrpc_err(same) } async fn read_stderr( @@ -853,9 +844,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { is_allowed(&req).await?; - self.do_read_stream(req, false) - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) + self.do_read_stream(req, false).await.map_ttrpc_err(same) } async fn close_stdin( @@ -902,23 +891,20 @@ impl agent_ttrpc::AgentService for AgentService { ) })?; - if let Some(fd) = p.term_master { - unsafe { - let win = winsize { - ws_row: req.row as c_ushort, - ws_col: req.column as c_ushort, - ws_xpixel: 0, - ws_ypixel: 0, - }; + let fd = p + .term_master + .map_ttrpc_err(ttrpc::Code::UNAVAILABLE, "no tty")?; + let win = winsize { + ws_row: req.row as c_ushort, + ws_col: req.column as c_ushort, + ws_xpixel: 0, + ws_ypixel: 0, + }; - let err = libc::ioctl(fd, TIOCSWINSZ, &win); - Errno::result(err).map(drop).map_err(|e| { - ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) - })?; - } - } else { - return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); - } + let err = unsafe { libc::ioctl(fd, TIOCSWINSZ, &win) }; + Errno::result(err) + .map(drop) + .map_ttrpc_err(|e| format!("ioctl error: {:?}", e))?; Ok(Empty::new()) } @@ -931,12 +917,10 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "update_interface", req); is_allowed(&req).await?; - let interface = req.interface.into_option().ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "empty update interface request".to_string(), - ) - })?; + let interface = req.interface.into_option().map_ttrpc_err( + ttrpc::Code::INVALID_ARGUMENT, + "empty update interface request", + )?; self.sandbox .lock() @@ -944,9 +928,7 @@ impl agent_ttrpc::AgentService for AgentService { .rtnl .update_interface(&interface) .await - .map_err(|e| { - ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) - })?; + .map_ttrpc_err(|e| format!("update interface: {:?}", e))?; Ok(interface) } @@ -959,28 +941,25 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "update_routes", req); is_allowed(&req).await?; - let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "empty update routes request".to_string(), - ) - })?; + let new_routes = req + .routes + .into_option() + .map(|r| r.Routes) + .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "empty update routes request")?; let mut sandbox = self.sandbox.lock().await; - sandbox.rtnl.update_routes(new_routes).await.map_err(|e| { - ttrpc_error( - ttrpc::Code::INTERNAL, - format!("Failed to update routes: {:?}", e), - ) - })?; + sandbox + .rtnl + .update_routes(new_routes) + .await + .map_ttrpc_err(|e| format!("Failed to update routes: {:?}", e))?; - let list = sandbox.rtnl.list_routes().await.map_err(|e| { - ttrpc_error( - ttrpc::Code::INTERNAL, - format!("Failed to list routes after update: {:?}", e), - ) - })?; + let list = sandbox + .rtnl + .list_routes() + .await + .map_ttrpc_err(|e| format!("Failed to list routes after update: {:?}", e))?; Ok(protocols::agent::Routes { Routes: list, @@ -996,13 +975,10 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "update_mounts", req); is_allowed(&req).await?; - match update_ephemeral_mounts(sl(), &req.storages, &self.sandbox).await { - Ok(_) => Ok(Empty::new()), - Err(e) => Err(ttrpc_error( - ttrpc::Code::INTERNAL, - format!("Failed to update mounts: {:?}", e), - )), - } + update_ephemeral_mounts(sl(), &req.storages, &self.sandbox) + .await + .map_ttrpc_err(|e| format!("Failed to update mounts: {:?}", e))?; + Ok(Empty::new()) } async fn get_ip_tables( @@ -1032,16 +1008,13 @@ impl agent_ttrpc::AgentService for AgentService { } .to_string(); - match Command::new(cmd.clone()).output() { - Ok(output) => Ok(GetIPTablesResponse { - data: output.stdout, - ..Default::default() - }), - Err(e) => { - warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()); - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } - } + let output = Command::new(cmd.clone()) + .output() + .map_ttrpc_err_do(|e| warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()))?; + Ok(GetIPTablesResponse { + data: output.stdout, + ..Default::default() + }) } async fn set_ip_tables( @@ -1071,20 +1044,14 @@ impl agent_ttrpc::AgentService for AgentService { } .to_string(); - let mut child = match Command::new(cmd.clone()) + let mut child = Command::new(cmd.clone()) .arg("--wait") .arg(IPTABLES_RESTORE_WAIT_SEC.to_string()) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() - { - Ok(child) => child, - Err(e) => { - warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()); - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } - }; + .map_ttrpc_err_do(|e| warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()))?; let mut stdin = match child.stdin.take() { Some(si) => si, @@ -1092,7 +1059,7 @@ impl agent_ttrpc::AgentService for AgentService { println!("failed to get stdin from child"); return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "failed to take stdin from child".to_string(), + "failed to take stdin from child", )); } }; @@ -1111,35 +1078,22 @@ impl agent_ttrpc::AgentService for AgentService { }; }); - if tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx) + let _ = tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx) .await - .is_err() - { - return Err(ttrpc_error( - ttrpc::Code::INTERNAL, - "timeout waiting for stdin writer to complete".to_string(), - )); - } + .map_ttrpc_err(|_| "timeout waiting for stdin writer to complete")?; - if handle.await.is_err() { - return Err(ttrpc_error( - ttrpc::Code::INTERNAL, - "stdin writer thread failure".to_string(), - )); - } + handle + .await + .map_ttrpc_err(|_| "stdin writer thread failure")?; - let output = match child.wait_with_output() { - Ok(o) => o, - Err(e) => { - warn!( - sl(), - "failure waiting for spawned {} to complete: {:?}", - cmd, - e.kind() - ); - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } - }; + let output = child.wait_with_output().map_ttrpc_err_do(|e| { + warn!( + sl(), + "failure waiting for spawned {} to complete: {:?}", + cmd, + e.kind() + ) + })?; if !output.status.success() { warn!(sl(), "{} failed: {:?}", cmd, output.stderr); @@ -1174,12 +1128,7 @@ impl agent_ttrpc::AgentService for AgentService { .rtnl .list_interfaces() .await - .map_err(|e| { - ttrpc_error( - ttrpc::Code::INTERNAL, - format!("Failed to list interfaces: {:?}", e), - ) - })?; + .map_ttrpc_err(|e| format!("Failed to list interfaces: {:?}", e))?; Ok(protocols::agent::Interfaces { Interfaces: list, @@ -1202,7 +1151,7 @@ impl agent_ttrpc::AgentService for AgentService { .rtnl .list_routes() .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; + .map_ttrpc_err(|e| format!("list routes: {:?}", e))?; Ok(protocols::agent::Routes { Routes: list, @@ -1241,30 +1190,24 @@ impl agent_ttrpc::AgentService for AgentService { } for m in req.kernel_modules.iter() { - load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + load_kernel_module(m).map_ttrpc_err(same)?; } - s.setup_shared_namespaces() - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + s.setup_shared_namespaces().await.map_ttrpc_err(same)?; } - match add_storages(sl(), &req.storages, &self.sandbox, None).await { - Ok(m) => { - self.sandbox.lock().await.mounts = m; - } - Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - }; + let m = add_storages(sl(), &req.storages, &self.sandbox, None) + .await + .map_ttrpc_err(same)?; + self.sandbox.lock().await.mounts = m; - match setup_guest_dns(sl(), &req.dns) { - Ok(_) => { - let mut s = self.sandbox.lock().await; - for dns in req.dns { - s.network.set_dns(dns); - } + setup_guest_dns(sl(), &req.dns).map_ttrpc_err(same)?; + { + let mut s = self.sandbox.lock().await; + for dns in req.dns { + s.network.set_dns(dns); } - Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - }; + } Ok(Empty::new()) } @@ -1279,10 +1222,7 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = self.sandbox.lock().await; // destroy all containers, clean up, notify agent to exit etc. - sandbox - .destroy() - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + sandbox.destroy().await.map_ttrpc_err(same)?; // Close get_oom_event connection, // otherwise it will block the shutdown of ttrpc. drop(sandbox.event_tx.take()); @@ -1290,14 +1230,12 @@ impl agent_ttrpc::AgentService for AgentService { sandbox .sender .take() - .ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INTERNAL, - "failed to get sandbox sender channel".to_string(), - ) - })? + .map_ttrpc_err( + ttrpc::Code::INTERNAL, + "failed to get sandbox sender channel", + )? .send(1) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + .map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1314,12 +1252,10 @@ impl agent_ttrpc::AgentService for AgentService { .neighbors .into_option() .map(|n| n.ARPNeighbors) - .ok_or_else(|| { - ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "empty add arp neighbours request".to_string(), - ) - })?; + .map_ttrpc_err( + ttrpc::Code::INVALID_ARGUMENT, + "empty add arp neighbours request", + )?; self.sandbox .lock() @@ -1327,12 +1263,7 @@ impl agent_ttrpc::AgentService for AgentService { .rtnl .add_arp_neighbors(neighs) .await - .map_err(|e| { - ttrpc_error( - ttrpc::Code::INTERNAL, - format!("Failed to add ARP neighbours: {:?}", e), - ) - })?; + .map_ttrpc_err(|e| format!("Failed to add ARP neighbours: {:?}", e))?; Ok(Empty::new()) } @@ -1346,9 +1277,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req).await?; let sandbox = self.sandbox.lock().await; - sandbox - .online_cpu_memory(&req) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + sandbox.online_cpu_memory(&req).map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1361,8 +1290,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "reseed_random_dev", req); is_allowed(&req).await?; - random::reseed_rng(req.data.as_slice()) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + random::reseed_rng(req.data.as_slice()).map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1378,21 +1306,16 @@ impl agent_ttrpc::AgentService for AgentService { info!(sl(), "get guest details!"); let mut resp = GuestDetailsResponse::new(); // to get memory block size - match get_memory_info( + let (u, v) = get_memory_info( req.mem_block_size, req.mem_hotplug_probe, SYSFS_MEMORY_BLOCK_SIZE_PATH, SYSFS_MEMORY_HOTPLUG_PROBE_PATH, - ) { - Ok((u, v)) => { - resp.mem_block_size_bytes = u; - resp.support_mem_hotplug_probe = v; - } - Err(e) => { - info!(sl(), "fail to get memory info!"); - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } - } + ) + .map_ttrpc_err_do(|_| info!(sl(), "fail to get memory info!"))?; + + resp.mem_block_size_bytes = u; + resp.support_mem_hotplug_probe = v; // to get agent details let detail = get_agent_details(); @@ -1409,8 +1332,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "mem_hotplug_by_probe", req); is_allowed(&req).await?; - do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1423,8 +1345,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "set_guest_date_time", req); is_allowed(&req).await?; - do_set_guest_date_time(req.Sec, req.Usec) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + do_set_guest_date_time(req.Sec, req.Usec).map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1437,7 +1358,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "copy_file", req); is_allowed(&req).await?; - do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + do_copy_file(&req).map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1450,14 +1371,10 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "get_metrics", req); is_allowed(&req).await?; - match get_metrics(&req) { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), - Ok(s) => { - let mut metrics = Metrics::new(); - metrics.set_metrics(s); - Ok(metrics) - } - } + let s = get_metrics(&req).map_ttrpc_err(same)?; + let mut metrics = Metrics::new(); + metrics.set_metrics(s); + Ok(metrics) } async fn get_oom_event( @@ -1471,16 +1388,16 @@ impl agent_ttrpc::AgentService for AgentService { let mut event_rx = event_rx.lock().await; drop(s); - if let Some(container_id) = event_rx.recv().await { - info!(sl(), "get_oom_event return {}", &container_id); + let container_id = event_rx + .recv() + .await + .map_ttrpc_err(ttrpc::Code::INTERNAL, "")?; - let mut resp = OOMEvent::new(); - resp.container_id = container_id; + info!(sl(), "get_oom_event return {}", &container_id); - return Ok(resp); - } - - Err(ttrpc_error(ttrpc::Code::INTERNAL, "")) + let mut resp = OOMEvent::new(); + resp.container_id = container_id; + Ok(resp) } async fn get_volume_stats( @@ -1495,28 +1412,23 @@ impl agent_ttrpc::AgentService for AgentService { let mut resp = VolumeStatsResponse::new(); let mut condition = VolumeCondition::new(); - match File::open(&req.volume_guest_path) { - Ok(_) => { - condition.abnormal = false; - condition.message = String::from("OK"); - } - Err(e) => { - info!(sl(), "failed to open the volume"); - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } - }; + File::open(&req.volume_guest_path) + .map_ttrpc_err_do(|_| info!(sl(), "failed to open the volume"))?; + + condition.abnormal = false; + condition.message = String::from("OK"); let mut usage_vec = Vec::new(); // to get volume capacity stats get_volume_capacity_stats(&req.volume_guest_path) .map(|u| usage_vec.push(u)) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + .map_ttrpc_err(same)?; // to get volume inode stats get_volume_inode_stats(&req.volume_guest_path) .map(|u| usage_vec.push(u)) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + .map_ttrpc_err(same)?; resp.usage = usage_vec; resp.volume_condition = MessageField::some(condition); @@ -1531,9 +1443,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "add_swap", req); is_allowed(&req).await?; - do_add_swap(&self.sandbox, &req) - .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + do_add_swap(&self.sandbox, &req).await.map_ttrpc_err(same)?; Ok(Empty::new()) } @@ -1552,7 +1462,7 @@ impl agent_ttrpc::AgentService for AgentService { .await .set_policy(&req.policy) .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; + .map_ttrpc_err(same)?; Ok(Empty::new()) }