agent: add more instruments for RPC calls

All RPC calls can get parent span context,
and create new sub-spans for the full trace.

Fixes: #1968

Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
bin 2021-06-07 17:53:15 +08:00
parent ae46e7bf97
commit cfb8139f36
2 changed files with 116 additions and 30 deletions

View File

@ -51,6 +51,14 @@ use crate::sandbox::Sandbox;
use crate::version::{AGENT_VERSION, API_VERSION}; use crate::version::{AGENT_VERSION, API_VERSION};
use crate::AGENT_CONFIG; use crate::AGENT_CONFIG;
use crate::trace_rpc_call;
use crate::tracer::extract_carrier_from_ttrpc;
use opentelemetry::global;
use tracing::span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing::instrument;
use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ}; use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fs; use std::fs;
@ -74,7 +82,7 @@ macro_rules! sl {
}; };
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct AgentService { pub struct AgentService {
sandbox: Arc<Mutex<Sandbox>>, sandbox: Arc<Mutex<Sandbox>>,
} }
@ -97,6 +105,7 @@ fn verify_cid(id: &str) -> Result<()> {
} }
impl AgentService { impl AgentService {
#[instrument]
async fn do_create_container( async fn do_create_container(
&self, &self,
req: protocols::agent::CreateContainerRequest, req: protocols::agent::CreateContainerRequest,
@ -196,6 +205,7 @@ impl AgentService {
Ok(()) Ok(())
} }
#[instrument]
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> { async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
let cid = req.container_id; let cid = req.container_id;
@ -221,6 +231,7 @@ impl AgentService {
Ok(()) Ok(())
} }
#[instrument]
async fn do_remove_container( async fn do_remove_container(
&self, &self,
req: protocols::agent::RemoveContainerRequest, req: protocols::agent::RemoveContainerRequest,
@ -298,6 +309,7 @@ impl AgentService {
Ok(()) Ok(())
} }
#[instrument]
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> { async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
let cid = req.container_id.clone(); let cid = req.container_id.clone();
let exec_id = req.exec_id.clone(); let exec_id = req.exec_id.clone();
@ -326,6 +338,7 @@ impl AgentService {
Ok(()) Ok(())
} }
#[instrument]
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> { async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
let cid = req.container_id.clone(); let cid = req.container_id.clone();
let eid = req.exec_id.clone(); let eid = req.exec_id.clone();
@ -360,6 +373,7 @@ impl AgentService {
Ok(()) Ok(())
} }
#[instrument]
async fn do_wait_process( async fn do_wait_process(
&self, &self,
req: protocols::agent::WaitProcessRequest, req: protocols::agent::WaitProcessRequest,
@ -509,9 +523,10 @@ impl AgentService {
impl protocols::agent_ttrpc::AgentService for AgentService { impl protocols::agent_ttrpc::AgentService for AgentService {
async fn create_container( async fn create_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::CreateContainerRequest, req: protocols::agent::CreateContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "create_container", req);
match self.do_create_container(req).await { match self.do_create_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()), Ok(_) => Ok(Empty::new()),
@ -520,9 +535,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn start_container( async fn start_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::StartContainerRequest, req: protocols::agent::StartContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "start_container", req);
match self.do_start_container(req).await { match self.do_start_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()), Ok(_) => Ok(Empty::new()),
@ -531,9 +547,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn remove_container( async fn remove_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::RemoveContainerRequest, req: protocols::agent::RemoveContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_container", req);
match self.do_remove_container(req).await { match self.do_remove_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()), Ok(_) => Ok(Empty::new()),
@ -542,9 +559,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn exec_process( async fn exec_process(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::ExecProcessRequest, req: protocols::agent::ExecProcessRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "exec_process", req);
match self.do_exec_process(req).await { match self.do_exec_process(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()), Ok(_) => Ok(Empty::new()),
@ -553,9 +571,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn signal_process( async fn signal_process(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::SignalProcessRequest, req: protocols::agent::SignalProcessRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "signal_process", req);
match self.do_signal_process(req).await { match self.do_signal_process(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()), Ok(_) => Ok(Empty::new()),
@ -564,9 +583,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn wait_process( async fn wait_process(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::WaitProcessRequest, req: protocols::agent::WaitProcessRequest,
) -> ttrpc::Result<WaitProcessResponse> { ) -> ttrpc::Result<WaitProcessResponse> {
trace_rpc_call!(ctx, "wait_process", req);
self.do_wait_process(req) self.do_wait_process(req)
.await .await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
@ -574,9 +594,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn update_container( async fn update_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::UpdateContainerRequest, req: protocols::agent::UpdateContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "update_container", req);
let cid = req.container_id.clone(); let cid = req.container_id.clone();
let res = req.resources; let res = req.resources;
@ -608,9 +629,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn stats_container( async fn stats_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::StatsContainerRequest, req: protocols::agent::StatsContainerRequest,
) -> ttrpc::Result<StatsContainerResponse> { ) -> ttrpc::Result<StatsContainerResponse> {
trace_rpc_call!(ctx, "stats_container", req);
let cid = req.container_id; let cid = req.container_id;
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
@ -628,9 +650,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn pause_container( async fn pause_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::PauseContainerRequest, req: protocols::agent::PauseContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> { ) -> ttrpc::Result<protocols::empty::Empty> {
trace_rpc_call!(ctx, "pause_container", req);
let cid = req.get_container_id(); let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
@ -650,9 +673,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn resume_container( async fn resume_container(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::ResumeContainerRequest, req: protocols::agent::ResumeContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> { ) -> ttrpc::Result<protocols::empty::Empty> {
trace_rpc_call!(ctx, "resume_container", req);
let cid = req.get_container_id(); let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
@ -702,9 +726,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn close_stdin( async fn close_stdin(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::CloseStdinRequest, req: protocols::agent::CloseStdinRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "close_stdin", req);
let cid = req.container_id.clone(); let cid = req.container_id.clone();
let eid = req.exec_id; let eid = req.exec_id;
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
@ -736,9 +762,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn tty_win_resize( async fn tty_win_resize(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::TtyWinResizeRequest, req: protocols::agent::TtyWinResizeRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "tty_win_resize", req);
let cid = req.container_id.clone(); let cid = req.container_id.clone();
let eid = req.exec_id.clone(); let eid = req.exec_id.clone();
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
@ -774,9 +802,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn update_interface( async fn update_interface(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::UpdateInterfaceRequest, req: protocols::agent::UpdateInterfaceRequest,
) -> ttrpc::Result<Interface> { ) -> ttrpc::Result<Interface> {
trace_rpc_call!(ctx, "update_interface", req);
let interface = req.interface.into_option().ok_or_else(|| { let interface = req.interface.into_option().ok_or_else(|| {
ttrpc_error( ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
@ -799,9 +829,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn update_routes( async fn update_routes(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::UpdateRoutesRequest, req: protocols::agent::UpdateRoutesRequest,
) -> ttrpc::Result<Routes> { ) -> ttrpc::Result<Routes> {
trace_rpc_call!(ctx, "update_routes", req);
let new_routes = req let new_routes = req
.routes .routes
.into_option() .into_option()
@ -837,9 +869,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn list_interfaces( async fn list_interfaces(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
_req: protocols::agent::ListInterfacesRequest, req: protocols::agent::ListInterfacesRequest,
) -> ttrpc::Result<Interfaces> { ) -> ttrpc::Result<Interfaces> {
trace_rpc_call!(ctx, "list_interfaces", req);
let list = self let list = self
.sandbox .sandbox
.lock() .lock()
@ -862,9 +896,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn list_routes( async fn list_routes(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
_req: protocols::agent::ListRoutesRequest, req: protocols::agent::ListRoutesRequest,
) -> ttrpc::Result<Routes> { ) -> ttrpc::Result<Routes> {
trace_rpc_call!(ctx, "list_routes", req);
let list = self let list = self
.sandbox .sandbox
.lock() .lock()
@ -899,9 +935,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn create_sandbox( async fn create_sandbox(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::CreateSandboxRequest, req: protocols::agent::CreateSandboxRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "create_sandbox", req);
{ {
let sandbox = self.sandbox.clone(); let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await; let mut s = sandbox.lock().await;
@ -962,9 +1000,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn destroy_sandbox( async fn destroy_sandbox(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
_req: protocols::agent::DestroySandboxRequest, req: protocols::agent::DestroySandboxRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "destroy_sandbox", req);
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await; let mut sandbox = s.lock().await;
// destroy all containers, clean up, notify agent to exit // destroy all containers, clean up, notify agent to exit
@ -981,9 +1021,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn add_arp_neighbors( async fn add_arp_neighbors(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::AddARPNeighborsRequest, req: protocols::agent::AddARPNeighborsRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "add_arp_neighbors", req);
let neighs = req let neighs = req
.neighbors .neighbors
.into_option() .into_option()
@ -1013,11 +1055,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn online_cpu_mem( async fn online_cpu_mem(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::OnlineCPUMemRequest, req: protocols::agent::OnlineCPUMemRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
let sandbox = s.lock().await; let sandbox = s.lock().await;
trace_rpc_call!(ctx, "online_cpu_mem", req);
sandbox sandbox
.online_cpu_memory(&req) .online_cpu_memory(&req)
@ -1028,9 +1071,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn reseed_random_dev( async fn reseed_random_dev(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::ReseedRandomDevRequest, req: protocols::agent::ReseedRandomDevRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "reseed_random_dev", req);
random::reseed_rng(req.data.as_slice()) random::reseed_rng(req.data.as_slice())
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1039,9 +1084,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn get_guest_details( async fn get_guest_details(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::GuestDetailsRequest, req: protocols::agent::GuestDetailsRequest,
) -> ttrpc::Result<GuestDetailsResponse> { ) -> ttrpc::Result<GuestDetailsResponse> {
trace_rpc_call!(ctx, "get_guest_details", req);
info!(sl!(), "get guest details!"); info!(sl!(), "get guest details!");
let mut resp = GuestDetailsResponse::new(); let mut resp = GuestDetailsResponse::new();
// to get memory block size // to get memory block size
@ -1065,9 +1112,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn mem_hotplug_by_probe( async fn mem_hotplug_by_probe(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::MemHotplugByProbeRequest, req: protocols::agent::MemHotplugByProbeRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1076,9 +1125,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn set_guest_date_time( async fn set_guest_date_time(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::SetGuestDateTimeRequest, req: protocols::agent::SetGuestDateTimeRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "set_guest_date_time", req);
do_set_guest_date_time(req.Sec, req.Usec) do_set_guest_date_time(req.Sec, req.Usec)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1087,9 +1138,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn copy_file( async fn copy_file(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::CopyFileRequest, req: protocols::agent::CopyFileRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "copy_file", req);
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
Ok(Empty::new()) Ok(Empty::new())
@ -1097,9 +1150,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn get_metrics( async fn get_metrics(
&self, &self,
_ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::GetMetricsRequest, req: protocols::agent::GetMetricsRequest,
) -> ttrpc::Result<Metrics> { ) -> ttrpc::Result<Metrics> {
trace_rpc_call!(ctx, "get_metrics", req);
match get_metrics(&req) { match get_metrics(&req) {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(s) => { Ok(s) => {

View File

@ -5,14 +5,17 @@
use crate::config::AgentConfig; use crate::config::AgentConfig;
use anyhow::Result; use anyhow::Result;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider}; use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider};
use slog::{info, o, Logger}; use slog::{info, o, Logger};
use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use tracing_opentelemetry::OpenTelemetryLayer; use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry; use tracing_subscriber::Registry;
use ttrpc::r#async::TtrpcContext;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum TraceType { pub enum TraceType {
@ -81,6 +84,8 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
tracing::subscriber::set_global_default(subscriber)?; tracing::subscriber::set_global_default(subscriber)?;
global::set_text_map_propagator(TraceContextPropagator::new());
info!(logger, "tracing setup"); info!(logger, "tracing setup");
Ok(()) Ok(())
@ -89,3 +94,29 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
pub fn end_tracing() { pub fn end_tracing() {
global::shutdown_tracer_provider(); global::shutdown_tracer_provider();
} }
pub fn extract_carrier_from_ttrpc(ttrpc_context: &TtrpcContext) -> HashMap<String, String> {
let mut carrier = HashMap::new();
for (k, v) in &ttrpc_context.metadata {
carrier.insert(k.clone(), v.join(","));
}
carrier
}
#[macro_export]
macro_rules! trace_rpc_call {
($ctx: ident, $name:literal, $req: ident) => {
// extract context from request context
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&extract_carrier_from_ttrpc($ctx))
});
// generate tracing span
let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req);
// assign parent span from external context
rpc_span.set_parent(parent_context);
let _enter = rpc_span.enter();
};
}