mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 15:02:45 +00:00
agent: add more instruments for RPC calls
All RPC calls can get parent span context, and create new sub-spans for the full trace. Fixes: #1968 Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
parent
ae46e7bf97
commit
cfb8139f36
@ -51,6 +51,14 @@ use crate::sandbox::Sandbox;
|
||||
use crate::version::{AGENT_VERSION, API_VERSION};
|
||||
use crate::AGENT_CONFIG;
|
||||
|
||||
use crate::trace_rpc_call;
|
||||
use crate::tracer::extract_carrier_from_ttrpc;
|
||||
use opentelemetry::global;
|
||||
use tracing::span;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
use tracing::instrument;
|
||||
|
||||
use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ};
|
||||
use std::convert::TryFrom;
|
||||
use std::fs;
|
||||
@ -74,7 +82,7 @@ macro_rules! sl {
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AgentService {
|
||||
sandbox: Arc<Mutex<Sandbox>>,
|
||||
}
|
||||
@ -97,6 +105,7 @@ fn verify_cid(id: &str) -> Result<()> {
|
||||
}
|
||||
|
||||
impl AgentService {
|
||||
#[instrument]
|
||||
async fn do_create_container(
|
||||
&self,
|
||||
req: protocols::agent::CreateContainerRequest,
|
||||
@ -196,6 +205,7 @@ impl AgentService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
|
||||
let cid = req.container_id;
|
||||
|
||||
@ -221,6 +231,7 @@ impl AgentService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn do_remove_container(
|
||||
&self,
|
||||
req: protocols::agent::RemoveContainerRequest,
|
||||
@ -298,6 +309,7 @@ impl AgentService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
|
||||
let cid = req.container_id.clone();
|
||||
let exec_id = req.exec_id.clone();
|
||||
@ -326,6 +338,7 @@ impl AgentService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
|
||||
let cid = req.container_id.clone();
|
||||
let eid = req.exec_id.clone();
|
||||
@ -360,6 +373,7 @@ impl AgentService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn do_wait_process(
|
||||
&self,
|
||||
req: protocols::agent::WaitProcessRequest,
|
||||
@ -509,9 +523,10 @@ impl AgentService {
|
||||
impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
async fn create_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::CreateContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "create_container", req);
|
||||
match self.do_create_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -520,9 +535,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn start_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::StartContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "start_container", req);
|
||||
match self.do_start_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -531,9 +547,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn remove_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::RemoveContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "remove_container", req);
|
||||
match self.do_remove_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -542,9 +559,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn exec_process(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::ExecProcessRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "exec_process", req);
|
||||
match self.do_exec_process(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -553,9 +571,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn signal_process(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::SignalProcessRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "signal_process", req);
|
||||
match self.do_signal_process(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -564,9 +583,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn wait_process(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::WaitProcessRequest,
|
||||
) -> ttrpc::Result<WaitProcessResponse> {
|
||||
trace_rpc_call!(ctx, "wait_process", req);
|
||||
self.do_wait_process(req)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
||||
@ -574,9 +594,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn update_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::UpdateContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "update_container", req);
|
||||
let cid = req.container_id.clone();
|
||||
let res = req.resources;
|
||||
|
||||
@ -608,9 +629,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn stats_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::StatsContainerRequest,
|
||||
) -> ttrpc::Result<StatsContainerResponse> {
|
||||
trace_rpc_call!(ctx, "stats_container", req);
|
||||
let cid = req.container_id;
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -628,9 +650,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn pause_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::PauseContainerRequest,
|
||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||
trace_rpc_call!(ctx, "pause_container", req);
|
||||
let cid = req.get_container_id();
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -650,9 +673,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn resume_container(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::ResumeContainerRequest,
|
||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||
trace_rpc_call!(ctx, "resume_container", req);
|
||||
let cid = req.get_container_id();
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -702,9 +726,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn close_stdin(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::CloseStdinRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "close_stdin", req);
|
||||
|
||||
let cid = req.container_id.clone();
|
||||
let eid = req.exec_id;
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
@ -736,9 +762,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn tty_win_resize(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::TtyWinResizeRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "tty_win_resize", req);
|
||||
|
||||
let cid = req.container_id.clone();
|
||||
let eid = req.exec_id.clone();
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
@ -774,9 +802,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn update_interface(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::UpdateInterfaceRequest,
|
||||
) -> ttrpc::Result<Interface> {
|
||||
trace_rpc_call!(ctx, "update_interface", req);
|
||||
|
||||
let interface = req.interface.into_option().ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
@ -799,9 +829,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn update_routes(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::UpdateRoutesRequest,
|
||||
) -> ttrpc::Result<Routes> {
|
||||
trace_rpc_call!(ctx, "update_routes", req);
|
||||
|
||||
let new_routes = req
|
||||
.routes
|
||||
.into_option()
|
||||
@ -837,9 +869,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn list_interfaces(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
_req: protocols::agent::ListInterfacesRequest,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::ListInterfacesRequest,
|
||||
) -> ttrpc::Result<Interfaces> {
|
||||
trace_rpc_call!(ctx, "list_interfaces", req);
|
||||
|
||||
let list = self
|
||||
.sandbox
|
||||
.lock()
|
||||
@ -862,9 +896,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn list_routes(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
_req: protocols::agent::ListRoutesRequest,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::ListRoutesRequest,
|
||||
) -> ttrpc::Result<Routes> {
|
||||
trace_rpc_call!(ctx, "list_routes", req);
|
||||
|
||||
let list = self
|
||||
.sandbox
|
||||
.lock()
|
||||
@ -899,9 +935,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn create_sandbox(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::CreateSandboxRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "create_sandbox", req);
|
||||
|
||||
{
|
||||
let sandbox = self.sandbox.clone();
|
||||
let mut s = sandbox.lock().await;
|
||||
@ -962,9 +1000,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn destroy_sandbox(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
_req: protocols::agent::DestroySandboxRequest,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::DestroySandboxRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "destroy_sandbox", req);
|
||||
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
// destroy all containers, clean up, notify agent to exit
|
||||
@ -981,9 +1021,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn add_arp_neighbors(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::AddARPNeighborsRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "add_arp_neighbors", req);
|
||||
|
||||
let neighs = req
|
||||
.neighbors
|
||||
.into_option()
|
||||
@ -1013,11 +1055,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn online_cpu_mem(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::OnlineCPUMemRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let sandbox = s.lock().await;
|
||||
trace_rpc_call!(ctx, "online_cpu_mem", req);
|
||||
|
||||
sandbox
|
||||
.online_cpu_memory(&req)
|
||||
@ -1028,9 +1071,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn reseed_random_dev(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::ReseedRandomDevRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "reseed_random_dev", req);
|
||||
|
||||
random::reseed_rng(req.data.as_slice())
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
|
||||
@ -1039,9 +1084,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn get_guest_details(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::GuestDetailsRequest,
|
||||
) -> ttrpc::Result<GuestDetailsResponse> {
|
||||
trace_rpc_call!(ctx, "get_guest_details", req);
|
||||
|
||||
info!(sl!(), "get guest details!");
|
||||
let mut resp = GuestDetailsResponse::new();
|
||||
// to get memory block size
|
||||
@ -1065,9 +1112,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn mem_hotplug_by_probe(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::MemHotplugByProbeRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
|
||||
|
||||
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
|
||||
@ -1076,9 +1125,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn set_guest_date_time(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::SetGuestDateTimeRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "set_guest_date_time", req);
|
||||
|
||||
do_set_guest_date_time(req.Sec, req.Usec)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
|
||||
@ -1087,9 +1138,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn copy_file(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::CopyFileRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "copy_file", req);
|
||||
|
||||
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
|
||||
Ok(Empty::new())
|
||||
@ -1097,9 +1150,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
async fn get_metrics(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::GetMetricsRequest,
|
||||
) -> ttrpc::Result<Metrics> {
|
||||
trace_rpc_call!(ctx, "get_metrics", req);
|
||||
|
||||
match get_metrics(&req) {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(s) => {
|
||||
|
@ -5,14 +5,17 @@
|
||||
|
||||
use crate::config::AgentConfig;
|
||||
use anyhow::Result;
|
||||
use opentelemetry::sdk::propagation::TraceContextPropagator;
|
||||
use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider};
|
||||
use slog::{info, o, Logger};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
use tracing_opentelemetry::OpenTelemetryLayer;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::Registry;
|
||||
use ttrpc::r#async::TtrpcContext;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum TraceType {
|
||||
@ -81,6 +84,8 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
|
||||
global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
|
||||
info!(logger, "tracing setup");
|
||||
|
||||
Ok(())
|
||||
@ -89,3 +94,29 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
|
||||
pub fn end_tracing() {
|
||||
global::shutdown_tracer_provider();
|
||||
}
|
||||
|
||||
pub fn extract_carrier_from_ttrpc(ttrpc_context: &TtrpcContext) -> HashMap<String, String> {
|
||||
let mut carrier = HashMap::new();
|
||||
for (k, v) in &ttrpc_context.metadata {
|
||||
carrier.insert(k.clone(), v.join(","));
|
||||
}
|
||||
|
||||
carrier
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! trace_rpc_call {
|
||||
($ctx: ident, $name:literal, $req: ident) => {
|
||||
// extract context from request context
|
||||
let parent_context = global::get_text_map_propagator(|propagator| {
|
||||
propagator.extract(&extract_carrier_from_ttrpc($ctx))
|
||||
});
|
||||
|
||||
// generate tracing span
|
||||
let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req);
|
||||
|
||||
// assign parent span from external context
|
||||
rpc_span.set_parent(parent_context);
|
||||
let _enter = rpc_span.enter();
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user