agent: convert the ttrpc_error macro to a function

There is nothing in it that requires it to be a macro. Converting it to
a function allows for better error messages.

Fixes: #7201

Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
Wedson Almeida Filho 2023-06-28 13:21:00 -03:00
parent 0e5d6ce6d7
commit 0860fbd410

View File

@ -117,16 +117,14 @@ macro_rules! sl {
};
}
// Convenience macro to wrap an error and response to ttrpc client
macro_rules! ttrpc_error {
($code:path, $err:expr $(,)?) => {
get_rpc_status($code, format!("{:?}", $err))
};
// Convenience function to wrap an error and response to ttrpc client
fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error {
get_rpc_status(code, format!("{:?}", err))
}
fn is_allowed(req: &impl MessageDyn) -> ttrpc::Result<()> {
if !AGENT_CONFIG.is_allowed_endpoint(req.descriptor_dyn().name()) {
Err(ttrpc_error!(
Err(ttrpc_error(
ttrpc::Code::UNIMPLEMENTED,
format!("{} is blocked", req.descriptor_dyn().name()),
))
@ -652,7 +650,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "create_container", req);
is_allowed(&req)?;
match self.do_create_container(req).await {
Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
}
@ -665,7 +663,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "start_container", req);
is_allowed(&req)?;
match self.do_start_container(req).await {
Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
}
@ -679,7 +677,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
match self.do_remove_container(req).await {
Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
}
@ -692,7 +690,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "exec_process", req);
is_allowed(&req)?;
match self.do_exec_process(req).await {
Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
}
@ -705,7 +703,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "signal_process", req);
is_allowed(&req)?;
match self.do_signal_process(req).await {
Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()),
}
}
@ -719,7 +717,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
self.do_wait_process(req)
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
}
async fn update_container(
@ -736,7 +734,7 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = s.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
@ -748,7 +746,7 @@ impl agent_ttrpc::AgentService for AgentService {
let oci_res = rustjail::resources_grpc_to_oci(res);
match ctr.set(oci_res) {
Err(e) => {
return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e));
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
Ok(_) => return Ok(resp),
@ -770,14 +768,14 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = s.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
ctr.stats()
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
}
async fn pause_container(
@ -792,14 +790,14 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = s.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
ctr.pause()
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -816,14 +814,14 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = s.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)
})?;
ctr.resume()
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -836,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
is_allowed(&req)?;
let mount_infos = parse_mount_table("/proc/self/mountinfo")
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
for m in &mount_infos {
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
@ -859,7 +857,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
self.do_write_stream(req)
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
}
async fn read_stdout(
@ -870,7 +868,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
self.do_read_stream(req, true)
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
}
async fn read_stderr(
@ -881,7 +879,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
self.do_read_stream(req, false)
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
}
async fn close_stdin(
@ -900,7 +898,7 @@ impl agent_ttrpc::AgentService for AgentService {
let p = sandbox
.find_container_process(cid.as_str(), eid.as_str())
.map_err(|e| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
format!("invalid argument: {:?}", e),
)
@ -926,7 +924,7 @@ impl agent_ttrpc::AgentService for AgentService {
let p = sandbox
.find_container_process(cid.as_str(), eid.as_str())
.map_err(|e| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::UNAVAILABLE,
format!("invalid argument: {:?}", e),
)
@ -943,11 +941,11 @@ impl agent_ttrpc::AgentService for AgentService {
let err = libc::ioctl(fd, TIOCSWINSZ, &win);
Errno::result(err).map(drop).map_err(|e| {
ttrpc_error!(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e))
ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e))
})?;
}
} else {
return Err(ttrpc_error!(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
}
Ok(Empty::new())
@ -962,7 +960,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
let interface = req.interface.into_option().ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"empty update interface request".to_string(),
)
@ -975,7 +973,7 @@ impl agent_ttrpc::AgentService for AgentService {
.update_interface(&interface)
.await
.map_err(|e| {
ttrpc_error!(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
})?;
Ok(interface)
@ -990,7 +988,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"empty update routes request".to_string(),
)
@ -999,14 +997,14 @@ impl agent_ttrpc::AgentService for AgentService {
let mut sandbox = self.sandbox.lock().await;
sandbox.rtnl.update_routes(new_routes).await.map_err(|e| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to update routes: {:?}", e),
)
})?;
let list = sandbox.rtnl.list_routes().await.map_err(|e| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to list routes after update: {:?}", e),
)
@ -1028,7 +1026,7 @@ impl agent_ttrpc::AgentService for AgentService {
match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await {
Ok(_) => Ok(Empty::new()),
Err(e) => Err(ttrpc_error!(
Err(e) => Err(ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to update mounts: {:?}", e),
)),
@ -1069,7 +1067,7 @@ impl agent_ttrpc::AgentService for AgentService {
}),
Err(e) => {
warn!(sl!(), "failed to run {}: {:?}", cmd, e.kind());
return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e));
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
}
}
@ -1112,7 +1110,7 @@ impl agent_ttrpc::AgentService for AgentService {
Ok(child) => child,
Err(e) => {
warn!(sl!(), "failure to spawn {}: {:?}", cmd, e.kind());
return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e));
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
@ -1120,9 +1118,9 @@ impl agent_ttrpc::AgentService for AgentService {
Some(si) => si,
None => {
println!("failed to get stdin from child");
return Err(ttrpc_error!(
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"failed to take stdin from child".to_string()
"failed to take stdin from child".to_string(),
));
}
};
@ -1145,16 +1143,16 @@ impl agent_ttrpc::AgentService for AgentService {
.await
.is_err()
{
return Err(ttrpc_error!(
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"timeout waiting for stdin writer to complete".to_string()
"timeout waiting for stdin writer to complete".to_string(),
));
}
if handle.await.is_err() {
return Err(ttrpc_error!(
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
"stdin writer thread failure".to_string()
"stdin writer thread failure".to_string(),
));
}
@ -1167,19 +1165,19 @@ impl agent_ttrpc::AgentService for AgentService {
cmd,
e.kind()
);
return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e));
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
if !output.status.success() {
warn!(sl!(), "{} failed: {:?}", cmd, output.stderr);
return Err(ttrpc_error!(
return Err(ttrpc_error(
ttrpc::Code::INTERNAL,
format!(
"{} failed: {:?}",
cmd,
String::from_utf8_lossy(&output.stderr)
)
),
));
}
@ -1205,7 +1203,7 @@ impl agent_ttrpc::AgentService for AgentService {
.list_interfaces()
.await
.map_err(|e| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to list interfaces: {:?}", e),
)
@ -1232,7 +1230,7 @@ impl agent_ttrpc::AgentService for AgentService {
.rtnl
.list_routes()
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
Ok(protocols::agent::Routes {
Routes: list,
@ -1272,12 +1270,12 @@ impl agent_ttrpc::AgentService for AgentService {
}
for m in req.kernel_modules.iter() {
load_kernel_module(m).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
}
s.setup_shared_namespaces()
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
}
match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone(), None).await {
@ -1286,7 +1284,7 @@ impl agent_ttrpc::AgentService for AgentService {
let mut s = sandbox.lock().await;
s.mounts = m
}
Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
match setup_guest_dns(sl!(), req.dns.to_vec()) {
@ -1299,7 +1297,7 @@ impl agent_ttrpc::AgentService for AgentService {
.iter()
.map(|dns| s.network.set_dns(dns.to_string()));
}
Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
Ok(Empty::new())
@ -1320,7 +1318,7 @@ impl agent_ttrpc::AgentService for AgentService {
sandbox
.destroy()
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
// Close get_oom_event connection,
// otherwise it will block the shutdown of ttrpc.
sandbox.event_tx.take();
@ -1329,13 +1327,13 @@ impl agent_ttrpc::AgentService for AgentService {
.sender
.take()
.ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INTERNAL,
"failed to get sandbox sender channel".to_string(),
)
})?
.send(1)
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -1353,7 +1351,7 @@ impl agent_ttrpc::AgentService for AgentService {
.into_option()
.map(|n| n.ARPNeighbors)
.ok_or_else(|| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
"empty add arp neighbours request".to_string(),
)
@ -1366,7 +1364,7 @@ impl agent_ttrpc::AgentService for AgentService {
.add_arp_neighbors(neighs)
.await
.map_err(|e| {
ttrpc_error!(
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to add ARP neighbours: {:?}", e),
)
@ -1387,7 +1385,7 @@ impl agent_ttrpc::AgentService for AgentService {
sandbox
.online_cpu_memory(&req)
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -1401,7 +1399,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
random::reseed_rng(req.data.as_slice())
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -1429,7 +1427,7 @@ impl agent_ttrpc::AgentService for AgentService {
}
Err(e) => {
info!(sl!(), "fail to get memory info!");
return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e));
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
}
@ -1449,7 +1447,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -1463,7 +1461,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
do_set_guest_date_time(req.Sec, req.Usec)
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -1476,7 +1474,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "copy_file", req);
is_allowed(&req)?;
do_copy_file(&req).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}
@ -1490,7 +1488,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
match get_metrics(&req) {
Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)),
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(s) => {
let mut metrics = Metrics::new();
metrics.set_metrics(s);
@ -1521,7 +1519,7 @@ impl agent_ttrpc::AgentService for AgentService {
return Ok(resp);
}
Err(ttrpc_error!(ttrpc::Code::INTERNAL, ""))
Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
}
async fn get_volume_stats(
@ -1544,7 +1542,7 @@ impl agent_ttrpc::AgentService for AgentService {
}
Err(e) => {
info!(sl!(), "failed to open the volume");
return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e));
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
}
};
@ -1553,12 +1551,12 @@ impl agent_ttrpc::AgentService for AgentService {
// to get volume capacity stats
get_volume_capacity_stats(&req.volume_guest_path)
.map(|u| usage_vec.push(u))
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
// to get volume inode stats
get_volume_inode_stats(&req.volume_guest_path)
.map(|u| usage_vec.push(u))
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
resp.usage = usage_vec;
resp.volume_condition = MessageField::some(condition);
@ -1575,7 +1573,7 @@ impl agent_ttrpc::AgentService for AgentService {
do_add_swap(&self.sandbox, &req)
.await
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
Ok(Empty::new())
}