diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index fe8d6a1149..bcbb0c98ee 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -51,6 +51,14 @@ use crate::sandbox::Sandbox; use crate::version::{AGENT_VERSION, API_VERSION}; use crate::AGENT_CONFIG; +use crate::trace_rpc_call; +use crate::tracer::extract_carrier_from_ttrpc; +use opentelemetry::global; +use tracing::span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use tracing::instrument; + use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ}; use std::convert::TryFrom; use std::fs; @@ -74,7 +82,7 @@ macro_rules! sl { }; } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct AgentService { sandbox: Arc>, } @@ -97,6 +105,7 @@ fn verify_cid(id: &str) -> Result<()> { } impl AgentService { + #[instrument] async fn do_create_container( &self, req: protocols::agent::CreateContainerRequest, @@ -196,6 +205,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> { let cid = req.container_id; @@ -221,6 +231,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_remove_container( &self, req: protocols::agent::RemoveContainerRequest, @@ -298,6 +309,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> { let cid = req.container_id.clone(); let exec_id = req.exec_id.clone(); @@ -326,6 +338,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> { let cid = req.container_id.clone(); let eid = req.exec_id.clone(); @@ -360,6 +373,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_wait_process( &self, req: protocols::agent::WaitProcessRequest, @@ -509,9 +523,10 @@ impl AgentService { impl protocols::agent_ttrpc::AgentService for AgentService { async fn create_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CreateContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "create_container", req); match self.do_create_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -520,9 +535,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn start_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::StartContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "start_container", req); match self.do_start_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -531,9 +547,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn remove_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::RemoveContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "remove_container", req); match self.do_remove_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -542,9 +559,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn exec_process( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::ExecProcessRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "exec_process", req); match self.do_exec_process(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -553,9 +571,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn signal_process( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::SignalProcessRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "signal_process", req); match self.do_signal_process(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -564,9 +583,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn wait_process( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::WaitProcessRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "wait_process", req); self.do_wait_process(req) .await .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) @@ -574,9 +594,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn update_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::UpdateContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "update_container", req); let cid = req.container_id.clone(); let res = req.resources; @@ -608,9 +629,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn stats_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::StatsContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "stats_container", req); let cid = req.container_id; let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -628,9 +650,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn pause_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::PauseContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "pause_container", req); let cid = req.get_container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -650,9 +673,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn resume_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::ResumeContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "resume_container", req); let cid = req.get_container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -702,9 +726,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn close_stdin( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "close_stdin", req); + let cid = req.container_id.clone(); let eid = req.exec_id; let s = Arc::clone(&self.sandbox); @@ -736,9 +762,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn tty_win_resize( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::TtyWinResizeRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "tty_win_resize", req); + let cid = req.container_id.clone(); let eid = req.exec_id.clone(); let s = Arc::clone(&self.sandbox); @@ -774,9 +802,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn update_interface( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "update_interface", req); + let interface = req.interface.into_option().ok_or_else(|| { ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, @@ -799,9 +829,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn update_routes( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::UpdateRoutesRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "update_routes", req); + let new_routes = req .routes .into_option() @@ -837,9 +869,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn list_interfaces( &self, - _ctx: &TtrpcContext, - _req: protocols::agent::ListInterfacesRequest, + ctx: &TtrpcContext, + req: protocols::agent::ListInterfacesRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "list_interfaces", req); + let list = self .sandbox .lock() @@ -862,9 +896,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn list_routes( &self, - _ctx: &TtrpcContext, - _req: protocols::agent::ListRoutesRequest, + ctx: &TtrpcContext, + req: protocols::agent::ListRoutesRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "list_routes", req); + let list = self .sandbox .lock() @@ -899,9 +935,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn create_sandbox( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CreateSandboxRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "create_sandbox", req); + { let sandbox = self.sandbox.clone(); let mut s = sandbox.lock().await; @@ -962,9 +1000,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn destroy_sandbox( &self, - _ctx: &TtrpcContext, - _req: protocols::agent::DestroySandboxRequest, + ctx: &TtrpcContext, + req: protocols::agent::DestroySandboxRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "destroy_sandbox", req); + let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; // destroy all containers, clean up, notify agent to exit @@ -981,9 +1021,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn add_arp_neighbors( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "add_arp_neighbors", req); + let neighs = req .neighbors .into_option() @@ -1013,11 +1055,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn online_cpu_mem( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { let s = Arc::clone(&self.sandbox); let sandbox = s.lock().await; + trace_rpc_call!(ctx, "online_cpu_mem", req); sandbox .online_cpu_memory(&req) @@ -1028,9 +1071,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn reseed_random_dev( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::ReseedRandomDevRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "reseed_random_dev", req); + random::reseed_rng(req.data.as_slice()) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1039,9 +1084,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn get_guest_details( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::GuestDetailsRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "get_guest_details", req); + info!(sl!(), "get guest details!"); let mut resp = GuestDetailsResponse::new(); // to get memory block size @@ -1065,9 +1112,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn mem_hotplug_by_probe( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::MemHotplugByProbeRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "mem_hotplug_by_probe", req); + do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1076,9 +1125,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn set_guest_date_time( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::SetGuestDateTimeRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "set_guest_date_time", req); + do_set_guest_date_time(req.Sec, req.Usec) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1087,9 +1138,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn copy_file( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CopyFileRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "copy_file", req); + do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) @@ -1097,9 +1150,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn get_metrics( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::GetMetricsRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "get_metrics", req); + match get_metrics(&req) { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(s) => { diff --git a/src/agent/src/tracer.rs b/src/agent/src/tracer.rs index 4ae5111eb4..2e53fbea9c 100644 --- a/src/agent/src/tracer.rs +++ b/src/agent/src/tracer.rs @@ -5,14 +5,17 @@ use crate::config::AgentConfig; use anyhow::Result; +use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider}; use slog::{info, o, Logger}; +use std::collections::HashMap; use std::error::Error; use std::fmt; use std::str::FromStr; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Registry; +use ttrpc::r#async::TtrpcContext; #[derive(Debug, PartialEq)] pub enum TraceType { @@ -81,6 +84,8 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf tracing::subscriber::set_global_default(subscriber)?; + global::set_text_map_propagator(TraceContextPropagator::new()); + info!(logger, "tracing setup"); Ok(()) @@ -89,3 +94,29 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf pub fn end_tracing() { global::shutdown_tracer_provider(); } + +pub fn extract_carrier_from_ttrpc(ttrpc_context: &TtrpcContext) -> HashMap { + let mut carrier = HashMap::new(); + for (k, v) in &ttrpc_context.metadata { + carrier.insert(k.clone(), v.join(",")); + } + + carrier +} + +#[macro_export] +macro_rules! trace_rpc_call { + ($ctx: ident, $name:literal, $req: ident) => { + // extract context from request context + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extract_carrier_from_ttrpc($ctx)) + }); + + // generate tracing span + let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req); + + // assign parent span from external context + rpc_span.set_parent(parent_context); + let _enter = rpc_span.enter(); + }; +}