agent: add more instruments for RPC calls

All RPC calls can get parent span context,
and create new sub-spans for the full trace.

Fixes: #1968

Signed-off-by: bin <bin@hyper.sh>
This commit is contained in:
bin 2021-06-07 17:53:15 +08:00
parent ae46e7bf97
commit cfb8139f36
2 changed files with 116 additions and 30 deletions

View File

@ -51,6 +51,14 @@ use crate::sandbox::Sandbox;
use crate::version::{AGENT_VERSION, API_VERSION};
use crate::AGENT_CONFIG;
use crate::trace_rpc_call;
use crate::tracer::extract_carrier_from_ttrpc;
use opentelemetry::global;
use tracing::span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing::instrument;
use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ};
use std::convert::TryFrom;
use std::fs;
@ -74,7 +82,7 @@ macro_rules! sl {
};
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct AgentService {
sandbox: Arc<Mutex<Sandbox>>,
}
@ -97,6 +105,7 @@ fn verify_cid(id: &str) -> Result<()> {
}
impl AgentService {
#[instrument]
async fn do_create_container(
&self,
req: protocols::agent::CreateContainerRequest,
@ -196,6 +205,7 @@ impl AgentService {
Ok(())
}
#[instrument]
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
let cid = req.container_id;
@ -221,6 +231,7 @@ impl AgentService {
Ok(())
}
#[instrument]
async fn do_remove_container(
&self,
req: protocols::agent::RemoveContainerRequest,
@ -298,6 +309,7 @@ impl AgentService {
Ok(())
}
#[instrument]
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
let cid = req.container_id.clone();
let exec_id = req.exec_id.clone();
@ -326,6 +338,7 @@ impl AgentService {
Ok(())
}
#[instrument]
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
@ -360,6 +373,7 @@ impl AgentService {
Ok(())
}
#[instrument]
async fn do_wait_process(
&self,
req: protocols::agent::WaitProcessRequest,
@ -509,9 +523,10 @@ impl AgentService {
impl protocols::agent_ttrpc::AgentService for AgentService {
async fn create_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::CreateContainerRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "create_container", req);
match self.do_create_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()),
@ -520,9 +535,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn start_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::StartContainerRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "start_container", req);
match self.do_start_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()),
@ -531,9 +547,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn remove_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::RemoveContainerRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_container", req);
match self.do_remove_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()),
@ -542,9 +559,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn exec_process(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::ExecProcessRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "exec_process", req);
match self.do_exec_process(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()),
@ -553,9 +571,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn signal_process(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::SignalProcessRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "signal_process", req);
match self.do_signal_process(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(_) => Ok(Empty::new()),
@ -564,9 +583,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn wait_process(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::WaitProcessRequest,
) -> ttrpc::Result<WaitProcessResponse> {
trace_rpc_call!(ctx, "wait_process", req);
self.do_wait_process(req)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
@ -574,9 +594,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn update_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::UpdateContainerRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "update_container", req);
let cid = req.container_id.clone();
let res = req.resources;
@ -608,9 +629,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn stats_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::StatsContainerRequest,
) -> ttrpc::Result<StatsContainerResponse> {
trace_rpc_call!(ctx, "stats_container", req);
let cid = req.container_id;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
@ -628,9 +650,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn pause_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::PauseContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> {
trace_rpc_call!(ctx, "pause_container", req);
let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
@ -650,9 +673,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn resume_container(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::ResumeContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> {
trace_rpc_call!(ctx, "resume_container", req);
let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
@ -702,9 +726,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn close_stdin(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::CloseStdinRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "close_stdin", req);
let cid = req.container_id.clone();
let eid = req.exec_id;
let s = Arc::clone(&self.sandbox);
@ -736,9 +762,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn tty_win_resize(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::TtyWinResizeRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "tty_win_resize", req);
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let s = Arc::clone(&self.sandbox);
@ -774,9 +802,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn update_interface(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::UpdateInterfaceRequest,
) -> ttrpc::Result<Interface> {
trace_rpc_call!(ctx, "update_interface", req);
let interface = req.interface.into_option().ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT,
@ -799,9 +829,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn update_routes(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::UpdateRoutesRequest,
) -> ttrpc::Result<Routes> {
trace_rpc_call!(ctx, "update_routes", req);
let new_routes = req
.routes
.into_option()
@ -837,9 +869,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn list_interfaces(
&self,
_ctx: &TtrpcContext,
_req: protocols::agent::ListInterfacesRequest,
ctx: &TtrpcContext,
req: protocols::agent::ListInterfacesRequest,
) -> ttrpc::Result<Interfaces> {
trace_rpc_call!(ctx, "list_interfaces", req);
let list = self
.sandbox
.lock()
@ -862,9 +896,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn list_routes(
&self,
_ctx: &TtrpcContext,
_req: protocols::agent::ListRoutesRequest,
ctx: &TtrpcContext,
req: protocols::agent::ListRoutesRequest,
) -> ttrpc::Result<Routes> {
trace_rpc_call!(ctx, "list_routes", req);
let list = self
.sandbox
.lock()
@ -899,9 +935,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn create_sandbox(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::CreateSandboxRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "create_sandbox", req);
{
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
@ -962,9 +1000,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn destroy_sandbox(
&self,
_ctx: &TtrpcContext,
_req: protocols::agent::DestroySandboxRequest,
ctx: &TtrpcContext,
req: protocols::agent::DestroySandboxRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "destroy_sandbox", req);
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
// destroy all containers, clean up, notify agent to exit
@ -981,9 +1021,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn add_arp_neighbors(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::AddARPNeighborsRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "add_arp_neighbors", req);
let neighs = req
.neighbors
.into_option()
@ -1013,11 +1055,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn online_cpu_mem(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::OnlineCPUMemRequest,
) -> ttrpc::Result<Empty> {
let s = Arc::clone(&self.sandbox);
let sandbox = s.lock().await;
trace_rpc_call!(ctx, "online_cpu_mem", req);
sandbox
.online_cpu_memory(&req)
@ -1028,9 +1071,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn reseed_random_dev(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::ReseedRandomDevRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "reseed_random_dev", req);
random::reseed_rng(req.data.as_slice())
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1039,9 +1084,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn get_guest_details(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::GuestDetailsRequest,
) -> ttrpc::Result<GuestDetailsResponse> {
trace_rpc_call!(ctx, "get_guest_details", req);
info!(sl!(), "get guest details!");
let mut resp = GuestDetailsResponse::new();
// to get memory block size
@ -1065,9 +1112,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn mem_hotplug_by_probe(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::MemHotplugByProbeRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1076,9 +1125,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn set_guest_date_time(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::SetGuestDateTimeRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "set_guest_date_time", req);
do_set_guest_date_time(req.Sec, req.Usec)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1087,9 +1138,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn copy_file(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::CopyFileRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "copy_file", req);
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
Ok(Empty::new())
@ -1097,9 +1150,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn get_metrics(
&self,
_ctx: &TtrpcContext,
ctx: &TtrpcContext,
req: protocols::agent::GetMetricsRequest,
) -> ttrpc::Result<Metrics> {
trace_rpc_call!(ctx, "get_metrics", req);
match get_metrics(&req) {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
Ok(s) => {

View File

@ -5,14 +5,17 @@
use crate::config::AgentConfig;
use anyhow::Result;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider};
use slog::{info, o, Logger};
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::str::FromStr;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
use ttrpc::r#async::TtrpcContext;
#[derive(Debug, PartialEq)]
pub enum TraceType {
@ -81,6 +84,8 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
tracing::subscriber::set_global_default(subscriber)?;
global::set_text_map_propagator(TraceContextPropagator::new());
info!(logger, "tracing setup");
Ok(())
@ -89,3 +94,29 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf
pub fn end_tracing() {
global::shutdown_tracer_provider();
}
pub fn extract_carrier_from_ttrpc(ttrpc_context: &TtrpcContext) -> HashMap<String, String> {
let mut carrier = HashMap::new();
for (k, v) in &ttrpc_context.metadata {
carrier.insert(k.clone(), v.join(","));
}
carrier
}
#[macro_export]
macro_rules! trace_rpc_call {
($ctx: ident, $name:literal, $req: ident) => {
// extract context from request context
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&extract_carrier_from_ttrpc($ctx))
});
// generate tracing span
let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req);
// assign parent span from external context
rpc_span.set_parent(parent_context);
let _enter = rpc_span.enter();
};
}