diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index e9081c8df9..ef2c0f9e7f 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -117,16 +117,14 @@ macro_rules! sl { }; } -// Convenience macro to wrap an error and response to ttrpc client -macro_rules! ttrpc_error { - ($code:path, $err:expr $(,)?) => { - get_rpc_status($code, format!("{:?}", $err)) - }; +// Convenience function to wrap an error and response to ttrpc client +fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error { + get_rpc_status(code, format!("{:?}", err)) } fn is_allowed(req: &impl MessageDyn) -> ttrpc::Result<()> { if !AGENT_CONFIG.is_allowed_endpoint(req.descriptor_dyn().name()) { - Err(ttrpc_error!( + Err(ttrpc_error( ttrpc::Code::UNIMPLEMENTED, format!("{} is blocked", req.descriptor_dyn().name()), )) @@ -652,7 +650,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "create_container", req); is_allowed(&req)?; match self.do_create_container(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -665,7 +663,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "start_container", req); is_allowed(&req)?; match self.do_start_container(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -679,7 +677,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; match self.do_remove_container(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -692,7 +690,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "exec_process", req); is_allowed(&req)?; match self.do_exec_process(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -705,7 +703,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "signal_process", req); is_allowed(&req)?; match self.do_signal_process(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -719,7 +717,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_wait_process(req) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn update_container( @@ -736,7 +734,7 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) @@ -748,7 +746,7 @@ impl agent_ttrpc::AgentService for AgentService { let oci_res = rustjail::resources_grpc_to_oci(res); match ctr.set(oci_res) { Err(e) => { - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } Ok(_) => return Ok(resp), @@ -770,14 +768,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.stats() - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn pause_container( @@ -792,14 +790,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.pause() - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -816,14 +814,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.resume() - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -836,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req); is_allowed(&req)?; let mount_infos = parse_mount_table("/proc/self/mountinfo") - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; for m in &mount_infos { if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) { // stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more. @@ -859,7 +857,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_write_stream(req) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn read_stdout( @@ -870,7 +868,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_read_stream(req, true) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn read_stderr( @@ -881,7 +879,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_read_stream(req, false) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn close_stdin( @@ -900,7 +898,7 @@ impl agent_ttrpc::AgentService for AgentService { let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, format!("invalid argument: {:?}", e), ) @@ -926,7 +924,7 @@ impl agent_ttrpc::AgentService for AgentService { let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::UNAVAILABLE, format!("invalid argument: {:?}", e), ) @@ -943,11 +941,11 @@ impl agent_ttrpc::AgentService for AgentService { let err = libc::ioctl(fd, TIOCSWINSZ, &win); Errno::result(err).map(drop).map_err(|e| { - ttrpc_error!(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) + ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) })?; } } else { - return Err(ttrpc_error!(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); + return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); } Ok(Empty::new()) @@ -962,7 +960,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; let interface = req.interface.into_option().ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty update interface request".to_string(), ) @@ -975,7 +973,7 @@ impl agent_ttrpc::AgentService for AgentService { .update_interface(&interface) .await .map_err(|e| { - ttrpc_error!(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) + ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) })?; Ok(interface) @@ -990,7 +988,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty update routes request".to_string(), ) @@ -999,14 +997,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = self.sandbox.lock().await; sandbox.rtnl.update_routes(new_routes).await.map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to update routes: {:?}", e), ) })?; let list = sandbox.rtnl.list_routes().await.map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to list routes after update: {:?}", e), ) @@ -1028,7 +1026,7 @@ impl agent_ttrpc::AgentService for AgentService { match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await { Ok(_) => Ok(Empty::new()), - Err(e) => Err(ttrpc_error!( + Err(e) => Err(ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to update mounts: {:?}", e), )), @@ -1069,7 +1067,7 @@ impl agent_ttrpc::AgentService for AgentService { }), Err(e) => { warn!(sl!(), "failed to run {}: {:?}", cmd, e.kind()); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } } @@ -1112,7 +1110,7 @@ impl agent_ttrpc::AgentService for AgentService { Ok(child) => child, Err(e) => { warn!(sl!(), "failure to spawn {}: {:?}", cmd, e.kind()); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; @@ -1120,9 +1118,9 @@ impl agent_ttrpc::AgentService for AgentService { Some(si) => si, None => { println!("failed to get stdin from child"); - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "failed to take stdin from child".to_string() + "failed to take stdin from child".to_string(), )); } }; @@ -1145,16 +1143,16 @@ impl agent_ttrpc::AgentService for AgentService { .await .is_err() { - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "timeout waiting for stdin writer to complete".to_string() + "timeout waiting for stdin writer to complete".to_string(), )); } if handle.await.is_err() { - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "stdin writer thread failure".to_string() + "stdin writer thread failure".to_string(), )); } @@ -1167,19 +1165,19 @@ impl agent_ttrpc::AgentService for AgentService { cmd, e.kind() ); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; if !output.status.success() { warn!(sl!(), "{} failed: {:?}", cmd, output.stderr); - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, format!( "{} failed: {:?}", cmd, String::from_utf8_lossy(&output.stderr) - ) + ), )); } @@ -1205,7 +1203,7 @@ impl agent_ttrpc::AgentService for AgentService { .list_interfaces() .await .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to list interfaces: {:?}", e), ) @@ -1232,7 +1230,7 @@ impl agent_ttrpc::AgentService for AgentService { .rtnl .list_routes() .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; Ok(protocols::agent::Routes { Routes: list, @@ -1272,12 +1270,12 @@ impl agent_ttrpc::AgentService for AgentService { } for m in req.kernel_modules.iter() { - load_kernel_module(m).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } s.setup_shared_namespaces() .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone(), None).await { @@ -1286,7 +1284,7 @@ impl agent_ttrpc::AgentService for AgentService { let mut s = sandbox.lock().await; s.mounts = m } - Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }; match setup_guest_dns(sl!(), req.dns.to_vec()) { @@ -1299,7 +1297,7 @@ impl agent_ttrpc::AgentService for AgentService { .iter() .map(|dns| s.network.set_dns(dns.to_string())); } - Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }; Ok(Empty::new()) @@ -1320,7 +1318,7 @@ impl agent_ttrpc::AgentService for AgentService { sandbox .destroy() .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; // Close get_oom_event connection, // otherwise it will block the shutdown of ttrpc. sandbox.event_tx.take(); @@ -1329,13 +1327,13 @@ impl agent_ttrpc::AgentService for AgentService { .sender .take() .ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, "failed to get sandbox sender channel".to_string(), ) })? .send(1) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1353,7 +1351,7 @@ impl agent_ttrpc::AgentService for AgentService { .into_option() .map(|n| n.ARPNeighbors) .ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty add arp neighbours request".to_string(), ) @@ -1366,7 +1364,7 @@ impl agent_ttrpc::AgentService for AgentService { .add_arp_neighbors(neighs) .await .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to add ARP neighbours: {:?}", e), ) @@ -1387,7 +1385,7 @@ impl agent_ttrpc::AgentService for AgentService { sandbox .online_cpu_memory(&req) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1401,7 +1399,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; random::reseed_rng(req.data.as_slice()) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1429,7 +1427,7 @@ impl agent_ttrpc::AgentService for AgentService { } Err(e) => { info!(sl!(), "fail to get memory info!"); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } @@ -1449,7 +1447,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1463,7 +1461,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; do_set_guest_date_time(req.Sec, req.Usec) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1476,7 +1474,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "copy_file", req); is_allowed(&req)?; - do_copy_file(&req).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1490,7 +1488,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; match get_metrics(&req) { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(s) => { let mut metrics = Metrics::new(); metrics.set_metrics(s); @@ -1521,7 +1519,7 @@ impl agent_ttrpc::AgentService for AgentService { return Ok(resp); } - Err(ttrpc_error!(ttrpc::Code::INTERNAL, "")) + Err(ttrpc_error(ttrpc::Code::INTERNAL, "")) } async fn get_volume_stats( @@ -1544,7 +1542,7 @@ impl agent_ttrpc::AgentService for AgentService { } Err(e) => { info!(sl!(), "failed to open the volume"); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; @@ -1553,12 +1551,12 @@ impl agent_ttrpc::AgentService for AgentService { // to get volume capacity stats get_volume_capacity_stats(&req.volume_guest_path) .map(|u| usage_vec.push(u)) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; // to get volume inode stats get_volume_inode_stats(&req.volume_guest_path) .map(|u| usage_vec.push(u)) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; resp.usage = usage_vec; resp.volume_condition = MessageField::some(condition); @@ -1575,7 +1573,7 @@ impl agent_ttrpc::AgentService for AgentService { do_add_swap(&self.sandbox, &req) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) }