diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index f45ce9236..444be723c 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -14,7 +14,7 @@ use std::path::Path; use std::sync::Arc; use ttrpc::{ self, - error::get_rpc_status as ttrpc_error, + error::get_rpc_status, r#async::{Server as TtrpcServer, TtrpcContext}, }; @@ -86,6 +86,13 @@ macro_rules! sl { }; } +// Convenience macro to wrap an error and response to ttrpc client +macro_rules! ttrpc_error { + ($code:path, $err:expr $(,)?) => { + get_rpc_status($code, format!("{:?}", $err)) + }; +} + macro_rules! is_allowed { ($req:ident) => { if !AGENT_CONFIG @@ -93,7 +100,7 @@ macro_rules! is_allowed { .await .is_allowed_endpoint($req.descriptor().name()) { - return Err(ttrpc_error( + return Err(ttrpc_error!( ttrpc::Code::UNIMPLEMENTED, format!("{} is blocked", $req.descriptor().name()), )); @@ -565,7 +572,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "create_container", req); is_allowed!(req); match self.do_create_container(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -578,7 +585,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "start_container", req); is_allowed!(req); match self.do_start_container(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -592,7 +599,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); match self.do_remove_container(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -605,7 +612,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "exec_process", req); is_allowed!(req); match self.do_exec_process(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -618,7 +625,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "signal_process", req); is_allowed!(req); match self.do_signal_process(req).await { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -632,7 +639,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); self.do_wait_process(req) .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) } async fn update_container( @@ -649,7 +656,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) @@ -661,7 +668,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let oci_res = rustjail::resources_grpc_to_oci(res); match ctr.set(oci_res) { Err(e) => { - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())); + return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); } Ok(_) => return Ok(resp), @@ -683,14 +690,14 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.stats() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) } async fn pause_container( @@ -705,14 +712,14 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.pause() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -729,14 +736,14 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.resume() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -749,7 +756,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); self.do_write_stream(req) .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) } async fn read_stdout( @@ -760,7 +767,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); self.do_read_stream(req, true) .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) } async fn read_stderr( @@ -771,7 +778,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); self.do_read_stream(req, false) .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) } async fn close_stdin( @@ -790,7 +797,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, format!("invalid argument: {:?}", e), ) @@ -816,7 +823,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::UNAVAILABLE, format!("invalid argument: {:?}", e), ) @@ -833,11 +840,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let err = libc::ioctl(fd, TIOCSWINSZ, &win); Errno::result(err).map(drop).map_err(|e| { - ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) + ttrpc_error!(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) })?; } } else { - return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); + return Err(ttrpc_error!(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); } Ok(Empty::new()) @@ -852,7 +859,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); let interface = req.interface.into_option().ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "empty update interface request".to_string(), ) @@ -865,7 +872,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .update_interface(&interface) .await .map_err(|e| { - ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) + ttrpc_error!(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) })?; Ok(interface) @@ -884,7 +891,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .into_option() .map(|r| r.Routes.into_vec()) .ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "empty update routes request".to_string(), ) @@ -893,14 +900,14 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let mut sandbox = self.sandbox.lock().await; sandbox.rtnl.update_routes(new_routes).await.map_err(|e| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INTERNAL, format!("Failed to update routes: {:?}", e), ) })?; let list = sandbox.rtnl.list_routes().await.map_err(|e| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INTERNAL, format!("Failed to list routes after update: {:?}", e), ) @@ -928,7 +935,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .list_interfaces() .await .map_err(|e| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INTERNAL, format!("Failed to list interfaces: {:?}", e), ) @@ -955,7 +962,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .rtnl .list_routes() .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; Ok(protocols::agent::Routes { Routes: RepeatedField::from_vec(list), @@ -995,13 +1002,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService { } for m in req.kernel_modules.iter() { - load_kernel_module(m) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + load_kernel_module(m).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; } s.setup_shared_namespaces() .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; } match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone(), None).await { @@ -1010,7 +1016,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { let mut s = sandbox.lock().await; s.mounts = m } - Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), }; match setup_guest_dns(sl!(), req.dns.to_vec()) { @@ -1023,7 +1029,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .iter() .map(|dns| s.network.set_dns(dns.to_string())); } - Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), }; Ok(Empty::new()) @@ -1044,7 +1050,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { sandbox .destroy() .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; // Close get_oom_event connection, // otherwise it will block the shutdown of ttrpc. sandbox.event_tx.take(); @@ -1053,13 +1059,13 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .sender .take() .ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INTERNAL, "failed to get sandbox sender channel".to_string(), ) })? .send(1) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1077,7 +1083,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .into_option() .map(|n| n.ARPNeighbors.into_vec()) .ok_or_else(|| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INVALID_ARGUMENT, "empty add arp neighbours request".to_string(), ) @@ -1090,7 +1096,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { .add_arp_neighbors(neighs) .await .map_err(|e| { - ttrpc_error( + ttrpc_error!( ttrpc::Code::INTERNAL, format!("Failed to add ARP neighbours: {:?}", e), ) @@ -1111,7 +1117,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { sandbox .online_cpu_memory(&req) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1125,7 +1131,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); random::reseed_rng(req.data.as_slice()) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1148,7 +1154,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { } Err(e) => { info!(sl!(), "fail to get memory info!"); - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())); + return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); } } @@ -1168,7 +1174,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1182,7 +1188,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); do_set_guest_date_time(req.Sec, req.Usec) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1195,7 +1201,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "copy_file", req); is_allowed!(req); - do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + do_copy_file(&req).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1209,7 +1215,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { is_allowed!(req); match get_metrics(&req) { - Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), + Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(s) => { let mut metrics = Metrics::new(); metrics.set_metrics(s); @@ -1240,7 +1246,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { return Ok(resp); } - Err(ttrpc_error(ttrpc::Code::INTERNAL, "")) + Err(ttrpc_error!(ttrpc::Code::INTERNAL, "")) } async fn add_swap( @@ -1253,7 +1259,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService { do_add_swap(&self.sandbox, &req) .await - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) }