mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 06:52:13 +00:00
agent: add more instruments for RPC calls
All RPC calls can get parent span context, and create new sub-spans for the full trace. Fixes: #1968 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
ae46e7bf97
commit
cfb8139f36
@ -51,6 +51,14 @@ use crate::sandbox::Sandbox;
|
|||||||
use crate::version::{AGENT_VERSION, API_VERSION};
|
use crate::version::{AGENT_VERSION, API_VERSION};
|
||||||
use crate::AGENT_CONFIG;
|
use crate::AGENT_CONFIG;
|
||||||
|
|
||||||
|
use crate::trace_rpc_call;
|
||||||
|
use crate::tracer::extract_carrier_from_ttrpc;
|
||||||
|
use opentelemetry::global;
|
||||||
|
use tracing::span;
|
||||||
|
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||||
|
|
||||||
|
use tracing::instrument;
|
||||||
|
|
||||||
use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ};
|
use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
@ -74,7 +82,7 @@ macro_rules! sl {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct AgentService {
|
pub struct AgentService {
|
||||||
sandbox: Arc<Mutex<Sandbox>>,
|
sandbox: Arc<Mutex<Sandbox>>,
|
||||||
}
|
}
|
||||||
@ -97,6 +105,7 @@ fn verify_cid(id: &str) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AgentService {
|
impl AgentService {
|
||||||
|
#[instrument]
|
||||||
async fn do_create_container(
|
async fn do_create_container(
|
||||||
&self,
|
&self,
|
||||||
req: protocols::agent::CreateContainerRequest,
|
req: protocols::agent::CreateContainerRequest,
|
||||||
@ -196,6 +205,7 @@ impl AgentService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
|
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
|
||||||
let cid = req.container_id;
|
let cid = req.container_id;
|
||||||
|
|
||||||
@ -221,6 +231,7 @@ impl AgentService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
async fn do_remove_container(
|
async fn do_remove_container(
|
||||||
&self,
|
&self,
|
||||||
req: protocols::agent::RemoveContainerRequest,
|
req: protocols::agent::RemoveContainerRequest,
|
||||||
@ -298,6 +309,7 @@ impl AgentService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
|
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let exec_id = req.exec_id.clone();
|
let exec_id = req.exec_id.clone();
|
||||||
@ -326,6 +338,7 @@ impl AgentService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
|
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let eid = req.exec_id.clone();
|
let eid = req.exec_id.clone();
|
||||||
@ -360,6 +373,7 @@ impl AgentService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
async fn do_wait_process(
|
async fn do_wait_process(
|
||||||
&self,
|
&self,
|
||||||
req: protocols::agent::WaitProcessRequest,
|
req: protocols::agent::WaitProcessRequest,
|
||||||
@ -509,9 +523,10 @@ impl AgentService {
|
|||||||
impl protocols::agent_ttrpc::AgentService for AgentService {
|
impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||||
async fn create_container(
|
async fn create_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::CreateContainerRequest,
|
req: protocols::agent::CreateContainerRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "create_container", req);
|
||||||
match self.do_create_container(req).await {
|
match self.do_create_container(req).await {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||||
Ok(_) => Ok(Empty::new()),
|
Ok(_) => Ok(Empty::new()),
|
||||||
@ -520,9 +535,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn start_container(
|
async fn start_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::StartContainerRequest,
|
req: protocols::agent::StartContainerRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "start_container", req);
|
||||||
match self.do_start_container(req).await {
|
match self.do_start_container(req).await {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||||
Ok(_) => Ok(Empty::new()),
|
Ok(_) => Ok(Empty::new()),
|
||||||
@ -531,9 +547,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn remove_container(
|
async fn remove_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::RemoveContainerRequest,
|
req: protocols::agent::RemoveContainerRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "remove_container", req);
|
||||||
match self.do_remove_container(req).await {
|
match self.do_remove_container(req).await {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||||
Ok(_) => Ok(Empty::new()),
|
Ok(_) => Ok(Empty::new()),
|
||||||
@ -542,9 +559,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn exec_process(
|
async fn exec_process(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::ExecProcessRequest,
|
req: protocols::agent::ExecProcessRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "exec_process", req);
|
||||||
match self.do_exec_process(req).await {
|
match self.do_exec_process(req).await {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||||
Ok(_) => Ok(Empty::new()),
|
Ok(_) => Ok(Empty::new()),
|
||||||
@ -553,9 +571,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn signal_process(
|
async fn signal_process(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::SignalProcessRequest,
|
req: protocols::agent::SignalProcessRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "signal_process", req);
|
||||||
match self.do_signal_process(req).await {
|
match self.do_signal_process(req).await {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||||
Ok(_) => Ok(Empty::new()),
|
Ok(_) => Ok(Empty::new()),
|
||||||
@ -564,9 +583,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn wait_process(
|
async fn wait_process(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::WaitProcessRequest,
|
req: protocols::agent::WaitProcessRequest,
|
||||||
) -> ttrpc::Result<WaitProcessResponse> {
|
) -> ttrpc::Result<WaitProcessResponse> {
|
||||||
|
trace_rpc_call!(ctx, "wait_process", req);
|
||||||
self.do_wait_process(req)
|
self.do_wait_process(req)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
||||||
@ -574,9 +594,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn update_container(
|
async fn update_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::UpdateContainerRequest,
|
req: protocols::agent::UpdateContainerRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "update_container", req);
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let res = req.resources;
|
let res = req.resources;
|
||||||
|
|
||||||
@ -608,9 +629,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn stats_container(
|
async fn stats_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::StatsContainerRequest,
|
req: protocols::agent::StatsContainerRequest,
|
||||||
) -> ttrpc::Result<StatsContainerResponse> {
|
) -> ttrpc::Result<StatsContainerResponse> {
|
||||||
|
trace_rpc_call!(ctx, "stats_container", req);
|
||||||
let cid = req.container_id;
|
let cid = req.container_id;
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
@ -628,9 +650,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn pause_container(
|
async fn pause_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::PauseContainerRequest,
|
req: protocols::agent::PauseContainerRequest,
|
||||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||||
|
trace_rpc_call!(ctx, "pause_container", req);
|
||||||
let cid = req.get_container_id();
|
let cid = req.get_container_id();
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
@ -650,9 +673,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn resume_container(
|
async fn resume_container(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::ResumeContainerRequest,
|
req: protocols::agent::ResumeContainerRequest,
|
||||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||||
|
trace_rpc_call!(ctx, "resume_container", req);
|
||||||
let cid = req.get_container_id();
|
let cid = req.get_container_id();
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
@ -702,9 +726,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn close_stdin(
|
async fn close_stdin(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::CloseStdinRequest,
|
req: protocols::agent::CloseStdinRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "close_stdin", req);
|
||||||
|
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let eid = req.exec_id;
|
let eid = req.exec_id;
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
@ -736,9 +762,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn tty_win_resize(
|
async fn tty_win_resize(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::TtyWinResizeRequest,
|
req: protocols::agent::TtyWinResizeRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "tty_win_resize", req);
|
||||||
|
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id.clone();
|
||||||
let eid = req.exec_id.clone();
|
let eid = req.exec_id.clone();
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
@ -774,9 +802,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn update_interface(
|
async fn update_interface(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::UpdateInterfaceRequest,
|
req: protocols::agent::UpdateInterfaceRequest,
|
||||||
) -> ttrpc::Result<Interface> {
|
) -> ttrpc::Result<Interface> {
|
||||||
|
trace_rpc_call!(ctx, "update_interface", req);
|
||||||
|
|
||||||
let interface = req.interface.into_option().ok_or_else(|| {
|
let interface = req.interface.into_option().ok_or_else(|| {
|
||||||
ttrpc_error(
|
ttrpc_error(
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
@ -799,9 +829,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn update_routes(
|
async fn update_routes(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::UpdateRoutesRequest,
|
req: protocols::agent::UpdateRoutesRequest,
|
||||||
) -> ttrpc::Result<Routes> {
|
) -> ttrpc::Result<Routes> {
|
||||||
|
trace_rpc_call!(ctx, "update_routes", req);
|
||||||
|
|
||||||
let new_routes = req
|
let new_routes = req
|
||||||
.routes
|
.routes
|
||||||
.into_option()
|
.into_option()
|
||||||
@ -837,9 +869,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn list_interfaces(
|
async fn list_interfaces(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
_req: protocols::agent::ListInterfacesRequest,
|
req: protocols::agent::ListInterfacesRequest,
|
||||||
) -> ttrpc::Result<Interfaces> {
|
) -> ttrpc::Result<Interfaces> {
|
||||||
|
trace_rpc_call!(ctx, "list_interfaces", req);
|
||||||
|
|
||||||
let list = self
|
let list = self
|
||||||
.sandbox
|
.sandbox
|
||||||
.lock()
|
.lock()
|
||||||
@ -862,9 +896,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn list_routes(
|
async fn list_routes(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
_req: protocols::agent::ListRoutesRequest,
|
req: protocols::agent::ListRoutesRequest,
|
||||||
) -> ttrpc::Result<Routes> {
|
) -> ttrpc::Result<Routes> {
|
||||||
|
trace_rpc_call!(ctx, "list_routes", req);
|
||||||
|
|
||||||
let list = self
|
let list = self
|
||||||
.sandbox
|
.sandbox
|
||||||
.lock()
|
.lock()
|
||||||
@ -899,9 +935,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn create_sandbox(
|
async fn create_sandbox(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::CreateSandboxRequest,
|
req: protocols::agent::CreateSandboxRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "create_sandbox", req);
|
||||||
|
|
||||||
{
|
{
|
||||||
let sandbox = self.sandbox.clone();
|
let sandbox = self.sandbox.clone();
|
||||||
let mut s = sandbox.lock().await;
|
let mut s = sandbox.lock().await;
|
||||||
@ -962,9 +1000,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn destroy_sandbox(
|
async fn destroy_sandbox(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
_req: protocols::agent::DestroySandboxRequest,
|
req: protocols::agent::DestroySandboxRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "destroy_sandbox", req);
|
||||||
|
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
// destroy all containers, clean up, notify agent to exit
|
// destroy all containers, clean up, notify agent to exit
|
||||||
@ -981,9 +1021,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn add_arp_neighbors(
|
async fn add_arp_neighbors(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::AddARPNeighborsRequest,
|
req: protocols::agent::AddARPNeighborsRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "add_arp_neighbors", req);
|
||||||
|
|
||||||
let neighs = req
|
let neighs = req
|
||||||
.neighbors
|
.neighbors
|
||||||
.into_option()
|
.into_option()
|
||||||
@ -1013,11 +1055,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn online_cpu_mem(
|
async fn online_cpu_mem(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::OnlineCPUMemRequest,
|
req: protocols::agent::OnlineCPUMemRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
let s = Arc::clone(&self.sandbox);
|
let s = Arc::clone(&self.sandbox);
|
||||||
let sandbox = s.lock().await;
|
let sandbox = s.lock().await;
|
||||||
|
trace_rpc_call!(ctx, "online_cpu_mem", req);
|
||||||
|
|
||||||
sandbox
|
sandbox
|
||||||
.online_cpu_memory(&req)
|
.online_cpu_memory(&req)
|
||||||
@ -1028,9 +1071,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn reseed_random_dev(
|
async fn reseed_random_dev(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::ReseedRandomDevRequest,
|
req: protocols::agent::ReseedRandomDevRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "reseed_random_dev", req);
|
||||||
|
|
||||||
random::reseed_rng(req.data.as_slice())
|
random::reseed_rng(req.data.as_slice())
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||||
|
|
||||||
@ -1039,9 +1084,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn get_guest_details(
|
async fn get_guest_details(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::GuestDetailsRequest,
|
req: protocols::agent::GuestDetailsRequest,
|
||||||
) -> ttrpc::Result<GuestDetailsResponse> {
|
) -> ttrpc::Result<GuestDetailsResponse> {
|
||||||
|
trace_rpc_call!(ctx, "get_guest_details", req);
|
||||||
|
|
||||||
info!(sl!(), "get guest details!");
|
info!(sl!(), "get guest details!");
|
||||||
let mut resp = GuestDetailsResponse::new();
|
let mut resp = GuestDetailsResponse::new();
|
||||||
// to get memory block size
|
// to get memory block size
|
||||||
@ -1065,9 +1112,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn mem_hotplug_by_probe(
|
async fn mem_hotplug_by_probe(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::MemHotplugByProbeRequest,
|
req: protocols::agent::MemHotplugByProbeRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
|
||||||
|
|
||||||
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
|
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||||
|
|
||||||
@ -1076,9 +1125,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn set_guest_date_time(
|
async fn set_guest_date_time(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::SetGuestDateTimeRequest,
|
req: protocols::agent::SetGuestDateTimeRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "set_guest_date_time", req);
|
||||||
|
|
||||||
do_set_guest_date_time(req.Sec, req.Usec)
|
do_set_guest_date_time(req.Sec, req.Usec)
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||||
|
|
||||||
@ -1087,9 +1138,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn copy_file(
|
async fn copy_file(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::CopyFileRequest,
|
req: protocols::agent::CopyFileRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "copy_file", req);
|
||||||
|
|
||||||
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||||
|
|
||||||
Ok(Empty::new())
|
Ok(Empty::new())
|
||||||
@ -1097,9 +1150,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
async fn get_metrics(
|
async fn get_metrics(
|
||||||
&self,
|
&self,
|
||||||
_ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::GetMetricsRequest,
|
req: protocols::agent::GetMetricsRequest,
|
||||||
) -> ttrpc::Result<Metrics> {
|
) -> ttrpc::Result<Metrics> {
|
||||||
|
trace_rpc_call!(ctx, "get_metrics", req);
|
||||||
|
|
||||||
match get_metrics(&req) {
|
match get_metrics(&req) {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||||
Ok(s) => {
|
Ok(s) => {
|
||||||
|
@ -5,14 +5,17 @@
|
|||||||
|
|
||||||
use crate::config::AgentConfig;
|
use crate::config::AgentConfig;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
use opentelemetry::sdk::propagation::TraceContextPropagator;
|
||||||
use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider};
|
use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider};
|
||||||
use slog::{info, o, Logger};
|
use slog::{info, o, Logger};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use tracing_opentelemetry::OpenTelemetryLayer;
|
use tracing_opentelemetry::OpenTelemetryLayer;
|
||||||
use tracing_subscriber::layer::SubscriberExt;
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
use tracing_subscriber::Registry;
|
use tracing_subscriber::Registry;
|
||||||
|
use ttrpc::r#async::TtrpcContext;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum TraceType {
|
pub enum TraceType {
|
||||||
@ -81,6 +84,8 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
|
|||||||
|
|
||||||
tracing::subscriber::set_global_default(subscriber)?;
|
tracing::subscriber::set_global_default(subscriber)?;
|
||||||
|
|
||||||
|
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||||
|
|
||||||
info!(logger, "tracing setup");
|
info!(logger, "tracing setup");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -89,3 +94,29 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
|
|||||||
pub fn end_tracing() {
|
pub fn end_tracing() {
|
||||||
global::shutdown_tracer_provider();
|
global::shutdown_tracer_provider();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn extract_carrier_from_ttrpc(ttrpc_context: &TtrpcContext) -> HashMap<String, String> {
|
||||||
|
let mut carrier = HashMap::new();
|
||||||
|
for (k, v) in &ttrpc_context.metadata {
|
||||||
|
carrier.insert(k.clone(), v.join(","));
|
||||||
|
}
|
||||||
|
|
||||||
|
carrier
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! trace_rpc_call {
|
||||||
|
($ctx: ident, $name:literal, $req: ident) => {
|
||||||
|
// extract context from request context
|
||||||
|
let parent_context = global::get_text_map_propagator(|propagator| {
|
||||||
|
propagator.extract(&extract_carrier_from_ttrpc($ctx))
|
||||||
|
});
|
||||||
|
|
||||||
|
// generate tracing span
|
||||||
|
let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req);
|
||||||
|
|
||||||
|
// assign parent span from external context
|
||||||
|
rpc_span.set_parent(parent_context);
|
||||||
|
let _enter = rpc_span.enter();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user