diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index b437f1993a..518597f5c8 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -51,6 +51,14 @@ use crate::sandbox::Sandbox; use crate::version::{AGENT_VERSION, API_VERSION}; use crate::AGENT_CONFIG; +use crate::trace_rpc_call; +use crate::tracer::extract_carrier_from_ttrpc; +use opentelemetry::global; +use tracing::span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use tracing::instrument; + use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ}; use std::convert::TryFrom; use std::fs; @@ -74,7 +82,7 @@ macro_rules! sl { }; } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct AgentService { sandbox: Arc>, } @@ -97,6 +105,7 @@ fn verify_cid(id: &str) -> Result<()> { } impl AgentService { + #[instrument] async fn do_create_container( &self, req: protocols::agent::CreateContainerRequest, @@ -196,6 +205,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> { let cid = req.container_id; @@ -221,6 +231,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_remove_container( &self, req: protocols::agent::RemoveContainerRequest, @@ -302,6 +313,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> { let cid = req.container_id.clone(); let exec_id = req.exec_id.clone(); @@ -330,6 +342,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> { let cid = req.container_id.clone(); let eid = req.exec_id.clone(); @@ -364,6 +377,7 @@ impl AgentService { Ok(()) } + #[instrument] async fn do_wait_process( &self, req: protocols::agent::WaitProcessRequest, @@ -513,9 +527,10 @@ impl AgentService { impl protocols::agent_ttrpc::AgentService for AgentService { async fn create_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CreateContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "create_container", req); match self.do_create_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -524,9 +539,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn start_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::StartContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "start_container", req); match self.do_start_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -535,9 +551,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn remove_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::RemoveContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "remove_container", req); match self.do_remove_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -546,9 +563,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn exec_process( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::ExecProcessRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "exec_process", req); match self.do_exec_process(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -557,9 +575,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn signal_process( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::SignalProcessRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "signal_process", req); match self.do_signal_process(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), @@ -568,9 +587,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn wait_process( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::WaitProcessRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "wait_process", req); self.do_wait_process(req) .await .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) @@ -578,9 +598,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn update_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::UpdateContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "update_container", req); let cid = req.container_id.clone(); let res = req.resources; @@ -612,9 +633,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn stats_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::StatsContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "stats_container", req); let cid = req.container_id; let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -632,9 +654,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn pause_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::PauseContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "pause_container", req); let cid = req.get_container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -654,9 +677,10 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn resume_container( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::ResumeContainerRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "resume_container", req); let cid = req.get_container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -706,9 +730,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn close_stdin( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "close_stdin", req); + let cid = req.container_id.clone(); let eid = req.exec_id; let s = Arc::clone(&self.sandbox); @@ -740,9 +766,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn tty_win_resize( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::TtyWinResizeRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "tty_win_resize", req); + let cid = req.container_id.clone(); let eid = req.exec_id.clone(); let s = Arc::clone(&self.sandbox); @@ -778,9 +806,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn update_interface( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "update_interface", req); + let interface = req.interface.into_option().ok_or_else(|| { ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, @@ -803,9 +833,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn update_routes( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::UpdateRoutesRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "update_routes", req); + let new_routes = req .routes .into_option() @@ -841,9 +873,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn list_interfaces( &self, - _ctx: &TtrpcContext, - _req: protocols::agent::ListInterfacesRequest, + ctx: &TtrpcContext, + req: protocols::agent::ListInterfacesRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "list_interfaces", req); + let list = self .sandbox .lock() @@ -866,9 +900,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn list_routes( &self, - _ctx: &TtrpcContext, - _req: protocols::agent::ListRoutesRequest, + ctx: &TtrpcContext, + req: protocols::agent::ListRoutesRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "list_routes", req); + let list = self .sandbox .lock() @@ -903,9 +939,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn create_sandbox( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CreateSandboxRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "create_sandbox", req); + { let sandbox = self.sandbox.clone(); let mut s = sandbox.lock().await; @@ -966,9 +1004,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn destroy_sandbox( &self, - _ctx: &TtrpcContext, - _req: protocols::agent::DestroySandboxRequest, + ctx: &TtrpcContext, + req: protocols::agent::DestroySandboxRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "destroy_sandbox", req); + let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; // destroy all containers, clean up, notify agent to exit @@ -985,9 +1025,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn add_arp_neighbors( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "add_arp_neighbors", req); + let neighs = req .neighbors .into_option() @@ -1017,11 +1059,12 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn online_cpu_mem( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { let s = Arc::clone(&self.sandbox); let sandbox = s.lock().await; + trace_rpc_call!(ctx, "online_cpu_mem", req); sandbox .online_cpu_memory(&req) @@ -1032,9 +1075,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn reseed_random_dev( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::ReseedRandomDevRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "reseed_random_dev", req); + random::reseed_rng(req.data.as_slice()) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1043,9 +1088,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn get_guest_details( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::GuestDetailsRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "get_guest_details", req); + info!(sl!(), "get guest details!"); let mut resp = GuestDetailsResponse::new(); // to get memory block size @@ -1069,9 +1116,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn mem_hotplug_by_probe( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::MemHotplugByProbeRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "mem_hotplug_by_probe", req); + do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1080,9 +1129,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn set_guest_date_time( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::SetGuestDateTimeRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "set_guest_date_time", req); + do_set_guest_date_time(req.Sec, req.Usec) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1091,9 +1142,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn copy_file( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::CopyFileRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "copy_file", req); + do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) @@ -1101,9 +1154,11 @@ impl protocols::agent_ttrpc::AgentService for AgentService { async fn get_metrics( &self, - _ctx: &TtrpcContext, + ctx: &TtrpcContext, req: protocols::agent::GetMetricsRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "get_metrics", req); + match get_metrics(&req) { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(s) => { diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index a3413ece6e..92d5167da2 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -184,7 +184,6 @@ impl Sandbox { Ok(true) } - #[instrument] pub fn add_container(&mut self, c: LinuxContainer) { self.containers.insert(c.id.clone(), c); } @@ -213,12 +212,10 @@ impl Sandbox { Ok(()) } - #[instrument] pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> { self.containers.get_mut(id) } - #[instrument] pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> { for (_, c) in self.containers.iter_mut() { if c.processes.get(&pid).is_some() { diff --git a/src/agent/src/tracer.rs b/src/agent/src/tracer.rs index 4ae5111eb4..2e53fbea9c 100644 --- a/src/agent/src/tracer.rs +++ b/src/agent/src/tracer.rs @@ -5,14 +5,17 @@ use crate::config::AgentConfig; use anyhow::Result; +use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider}; use slog::{info, o, Logger}; +use std::collections::HashMap; use std::error::Error; use std::fmt; use std::str::FromStr; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::Registry; +use ttrpc::r#async::TtrpcContext; #[derive(Debug, PartialEq)] pub enum TraceType { @@ -81,6 +84,8 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf tracing::subscriber::set_global_default(subscriber)?; + global::set_text_map_propagator(TraceContextPropagator::new()); + info!(logger, "tracing setup"); Ok(()) @@ -89,3 +94,29 @@ pub fn setup_tracing(name: &'static str, logger: &Logger, _agent_cfg: &AgentConf pub fn end_tracing() { global::shutdown_tracer_provider(); } + +pub fn extract_carrier_from_ttrpc(ttrpc_context: &TtrpcContext) -> HashMap { + let mut carrier = HashMap::new(); + for (k, v) in &ttrpc_context.metadata { + carrier.insert(k.clone(), v.join(",")); + } + + carrier +} + +#[macro_export] +macro_rules! trace_rpc_call { + ($ctx: ident, $name:literal, $req: ident) => { + // extract context from request context + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&extract_carrier_from_ttrpc($ctx)) + }); + + // generate tracing span + let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req); + + // assign parent span from external context + rpc_span.set_parent(parent_context); + let _enter = rpc_span.enter(); + }; +} diff --git a/src/runtime/virtcontainers/agent.go b/src/runtime/virtcontainers/agent.go index b8576c35d5..51fdf67dce 100644 --- a/src/runtime/virtcontainers/agent.go +++ b/src/runtime/virtcontainers/agent.go @@ -163,7 +163,7 @@ type agent interface { configure(ctx context.Context, h hypervisor, id, sharePath string, config KataAgentConfig) error // configureFromGrpc will update agent settings based on provided arguments which from Grpc - configureFromGrpc(h hypervisor, id string, config KataAgentConfig) error + configureFromGrpc(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error // reseedRNG will reseed the guest random number generator reseedRNG(ctx context.Context, data []byte) error diff --git a/src/runtime/virtcontainers/kata_agent.go b/src/runtime/virtcontainers/kata_agent.go index 693f1bd595..22f96ff911 100644 --- a/src/runtime/virtcontainers/kata_agent.go +++ b/src/runtime/virtcontainers/kata_agent.go @@ -32,6 +32,7 @@ import ( "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/types" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/label" + otelLabel "go.opentelemetry.io/otel/label" otelTrace "go.opentelemetry.io/otel/trace" "github.com/gogo/protobuf/proto" @@ -373,17 +374,25 @@ func (k *kataAgent) capabilities() types.Capabilities { return caps } -func (k *kataAgent) internalConfigure(h hypervisor, id string, config KataAgentConfig) error { +func (k *kataAgent) internalConfigure(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error { + span, _ := k.trace(ctx, "configure") + defer span.End() + var err error if k.vmSocket, err = h.generateSocket(id); err != nil { return err } k.keepConn = config.LongLiveConn + span.SetAttributes(otelLabel.Any("socket", k.vmSocket)) + return nil } -func (k *kataAgent) setupSandboxBindMounts(sandbox *Sandbox) (err error) { +func (k *kataAgent) setupSandboxBindMounts(ctx context.Context, sandbox *Sandbox) (err error) { + span, ctx := k.trace(ctx, "setupSandboxBindMounts") + defer span.End() + if len(sandbox.config.SandboxBindMounts) == 0 { return nil } @@ -412,13 +421,13 @@ func (k *kataAgent) setupSandboxBindMounts(sandbox *Sandbox) (err error) { for _, m := range sandbox.config.SandboxBindMounts { mountDest := filepath.Join(sandboxMountDir, filepath.Base(m)) // bind-mount each sandbox mount that's defined into the sandbox mounts dir - if err := bindMount(context.Background(), m, mountDest, true, "private"); err != nil { + if err := bindMount(ctx, m, mountDest, true, "private"); err != nil { return fmt.Errorf("Mounting sandbox directory: %v to %v: %w", m, mountDest, err) } mountedList = append(mountedList, mountDest) mountDest = filepath.Join(sandboxShareDir, filepath.Base(m)) - if err := remountRo(context.Background(), mountDest); err != nil { + if err := remountRo(ctx, mountDest); err != nil { return fmt.Errorf("remount sandbox directory: %v to %v: %w", m, mountDest, err) } @@ -454,7 +463,10 @@ func (k *kataAgent) cleanupSandboxBindMounts(sandbox *Sandbox) error { } func (k *kataAgent) configure(ctx context.Context, h hypervisor, id, sharePath string, config KataAgentConfig) error { - err := k.internalConfigure(h, id, config) + span, ctx := k.trace(ctx, "configure") + defer span.End() + + err := k.internalConfigure(ctx, h, id, config) if err != nil { return err } @@ -495,11 +507,14 @@ func (k *kataAgent) configure(ctx context.Context, h hypervisor, id, sharePath s return h.addDevice(ctx, sharedVolume, fsDev) } -func (k *kataAgent) configureFromGrpc(h hypervisor, id string, config KataAgentConfig) error { - return k.internalConfigure(h, id, config) +func (k *kataAgent) configureFromGrpc(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error { + return k.internalConfigure(ctx, h, id, config) } func (k *kataAgent) setupSharedPath(ctx context.Context, sandbox *Sandbox) (err error) { + span, ctx := k.trace(ctx, "setupSharedPath") + defer span.End() + // create shared path structure sharePath := getSharePath(sandbox.id) mountPath := getMountPath(sandbox.id) @@ -523,7 +538,7 @@ func (k *kataAgent) setupSharedPath(ctx context.Context, sandbox *Sandbox) (err }() // Setup sandbox bindmounts, if specified: - if err = k.setupSandboxBindMounts(sandbox); err != nil { + if err = k.setupSandboxBindMounts(ctx, sandbox); err != nil { return err } @@ -2019,18 +2034,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { } } -func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) { - ctx = context.Background() +func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) { + newCtx = ctx switch reqName { case grpcWaitProcessRequest, grpcGetOOMEventRequest: // Wait and GetOOMEvent have no timeout case grpcCheckRequest: - ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout) + newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout) default: - ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout) + newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout) } - return ctx, cancel + return newCtx, cancel } func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) { @@ -2049,7 +2064,7 @@ func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (inter return nil, errors.New("Invalid request type") } message := request.(proto.Message) - ctx, cancel := k.getReqContext(msgName) + ctx, cancel := k.getReqContext(spanCtx, msgName) if cancel != nil { defer cancel() } diff --git a/src/runtime/virtcontainers/kata_agent_test.go b/src/runtime/virtcontainers/kata_agent_test.go index e58589f9cf..533ea5b711 100644 --- a/src/runtime/virtcontainers/kata_agent_test.go +++ b/src/runtime/virtcontainers/kata_agent_test.go @@ -1279,7 +1279,7 @@ func TestSandboxBindMount(t *testing.T) { defer syscall.Unmount(sharePath, syscall.MNT_DETACH|UmountNoFollow) // Test the function. We expect it to succeed and for the mount to exist - err = k.setupSandboxBindMounts(sandbox) + err = k.setupSandboxBindMounts(context.Background(), sandbox) assert.NoError(err) // Test the cleanup function. We expect it to succeed for the mount to be removed. @@ -1303,9 +1303,9 @@ func TestSandboxBindMount(t *testing.T) { // We expect cleanup to fail on the first time, since it cannot remove the sandbox-bindmount directory because // there are leftover mounts. If we run it a second time, however, it should succeed since it'll remove the // second set of mounts: - err = k.setupSandboxBindMounts(sandbox) + err = k.setupSandboxBindMounts(context.Background(), sandbox) assert.NoError(err) - err = k.setupSandboxBindMounts(sandbox) + err = k.setupSandboxBindMounts(context.Background(), sandbox) assert.NoError(err) // Test the cleanup function. We expect it to succeed for the mount to be removed. err = k.cleanupSandboxBindMounts(sandbox) @@ -1317,7 +1317,7 @@ func TestSandboxBindMount(t *testing.T) { // Now, let's setup the sandbox bindmount to fail, and verify that no mounts are left behind // sandbox.config.SandboxBindMounts = append(sandbox.config.SandboxBindMounts, "oh-nos") - err = k.setupSandboxBindMounts(sandbox) + err = k.setupSandboxBindMounts(context.Background(), sandbox) assert.Error(err) // Verify there aren't any mounts left behind stat = syscall.Stat_t{} diff --git a/src/runtime/virtcontainers/mock_agent.go b/src/runtime/virtcontainers/mock_agent.go index f12093b389..17bd03d540 100644 --- a/src/runtime/virtcontainers/mock_agent.go +++ b/src/runtime/virtcontainers/mock_agent.go @@ -176,7 +176,7 @@ func (n *mockAgent) configure(ctx context.Context, h hypervisor, id, sharePath s return nil } -func (n *mockAgent) configureFromGrpc(h hypervisor, id string, config KataAgentConfig) error { +func (n *mockAgent) configureFromGrpc(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error { return nil } diff --git a/src/runtime/virtcontainers/mount.go b/src/runtime/virtcontainers/mount.go index 322c53214d..061203826b 100644 --- a/src/runtime/virtcontainers/mount.go +++ b/src/runtime/virtcontainers/mount.go @@ -18,6 +18,7 @@ import ( merr "github.com/hashicorp/go-multierror" "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils" "github.com/sirupsen/logrus" + otelLabel "go.opentelemetry.io/otel/label" ) // DefaultShmSize is the default shm size to be used in case host @@ -258,11 +259,13 @@ func moveMount(ctx context.Context, source, destination string) error { func bindMount(ctx context.Context, source, destination string, readonly bool, pgtypes string) error { span, _ := trace(ctx, "bindMount") defer span.End() + span.SetAttributes(otelLabel.String("source", source), otelLabel.String("destination", destination)) absSource, destination, err := evalMountPath(source, destination) if err != nil { return err } + span.SetAttributes(otelLabel.String("source_after_eval", absSource)) if err := syscall.Mount(absSource, destination, "bind", syscall.MS_BIND, ""); err != nil { return fmt.Errorf("Could not bind mount %v to %v: %v", absSource, destination, err) @@ -291,10 +294,15 @@ func bindMount(ctx context.Context, source, destination string, readonly bool, p // The mountflags should match the values used in the original mount() call, // except for those parameters that you are trying to change. func remount(ctx context.Context, mountflags uintptr, src string) error { + span, _ := trace(ctx, "remount") + defer span.End() + span.SetAttributes(otelLabel.String("source", src)) + absSrc, err := filepath.EvalSymlinks(src) if err != nil { return fmt.Errorf("Could not resolve symlink for %s", src) } + span.SetAttributes(otelLabel.String("source_after_eval", absSrc)) if err := syscall.Mount(absSrc, absSrc, "", syscall.MS_REMOUNT|mountflags, ""); err != nil { return fmt.Errorf("remount %s failed: %v", absSrc, err) @@ -353,6 +361,7 @@ func isSymlink(path string) bool { func bindUnmountContainerRootfs(ctx context.Context, sharedDir, cID string) error { span, _ := trace(ctx, "bindUnmountContainerRootfs") defer span.End() + span.SetAttributes(otelLabel.String("shared_dir", sharedDir), otelLabel.String("container_id", cID)) rootfsDest := filepath.Join(sharedDir, cID, rootfsDir) if isSymlink(filepath.Join(sharedDir, cID)) || isSymlink(rootfsDest) { @@ -375,6 +384,7 @@ func bindUnmountContainerRootfs(ctx context.Context, sharedDir, cID string) erro func bindUnmountAllRootfs(ctx context.Context, sharedDir string, sandbox *Sandbox) error { span, ctx := trace(ctx, "bindUnmountAllRootfs") defer span.End() + span.SetAttributes(otelLabel.String("shared_dir", sharedDir), otelLabel.String("sandbox_id", sandbox.id)) var errors *merr.Error for _, c := range sandbox.containers { diff --git a/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go b/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go index 50e4b9f8ba..362dca0ad2 100644 --- a/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go +++ b/src/runtime/virtcontainers/pkg/agent/protocols/client/client.go @@ -20,7 +20,11 @@ import ( "github.com/mdlayher/vsock" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" + otelLabel "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" grpcStatus "google.golang.org/grpc/status" "github.com/containerd/ttrpc" @@ -80,32 +84,8 @@ func NewAgentClient(ctx context.Context, sock string, timeout uint32) (*AgentCli if err != nil { return nil, err } - /* - dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} - dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux))) - var tracer opentracing.Tracer - - span := opentracing.SpanFromContext(ctx) - - // If the context contains a trace span, trace all client comms - if span != nil { - tracer = span.Tracer() - - dialOpts = append(dialOpts, - grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer))) - dialOpts = append(dialOpts, - grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer))) - } - - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...) - if err != nil { - return nil, err - } - */ - client := ttrpc.NewClient(conn) + client := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(TraceUnaryClientInterceptor())) return &AgentClient{ AgentServiceClient: agentgrpc.NewAgentServiceClient(client), @@ -119,6 +99,89 @@ func (c *AgentClient) Close() error { return c.conn.Close() } +func TraceUnaryClientInterceptor() ttrpc.UnaryClientInterceptor { + return func( + ctx context.Context, + req *ttrpc.Request, + resp *ttrpc.Response, + ci *ttrpc.UnaryClientInfo, + invoker ttrpc.Invoker, + ) error { + requestMetadata := make(ttrpc.MD) + + tracer := otel.Tracer("kata") + var span trace.Span + ctx, span = tracer.Start( + ctx, + fmt.Sprintf("ttrpc.%s", req.Method), + trace.WithSpanKind(trace.SpanKindClient), + ) + defer span.End() + + inject(ctx, &requestMetadata) + ctx = ttrpc.WithMetadata(ctx, requestMetadata) + setRequest(req, &requestMetadata) + + err := invoker(ctx, req, resp) + + if err != nil { + span.SetAttributes(otelLabel.Key("RPC_ERROR").Bool(true)) + } + // err can be nil, that will return an OK response code + if status, _ := status.FromError(err); status != nil { + span.SetAttributes(otelLabel.Key("RPC_CODE").Uint((uint)(status.Code()))) + span.SetAttributes(otelLabel.Key("RPC_MESSAGE").String(status.Message())) + } + + return err + } +} + +type metadataSupplier struct { + metadata *ttrpc.MD +} + +func (s *metadataSupplier) Get(key string) string { + values, ok := s.metadata.Get(key) + if !ok { + return "" + } + return values[0] +} + +func (s *metadataSupplier) Set(key string, value string) { + s.metadata.Set(key, value) +} + +func inject(ctx context.Context, metadata *ttrpc.MD) { + otel.GetTextMapPropagator().Inject(ctx, &metadataSupplier{ + metadata: metadata, + }) + +} + +func setRequest(req *ttrpc.Request, md *ttrpc.MD) { + newMD := make([]*ttrpc.KeyValue, 0) + for _, kv := range req.Metadata { + // not found in md, means that we can copy old kv + // otherwise, we will use the values in md to overwrite it + if _, found := md.Get(kv.Key); !found { + newMD = append(newMD, kv) + } + } + + req.Metadata = newMD + + for k, values := range *md { + for _, v := range values { + req.Metadata = append(req.Metadata, &ttrpc.KeyValue{ + Key: k, + Value: v, + }) + } + } +} + // vsock scheme is self-defined to be kept from being parsed by grpc. // Any format starting with "scheme://" will be parsed by grpc and we lose // all address information because vsock scheme is not supported by grpc. diff --git a/src/runtime/virtcontainers/qemu.go b/src/runtime/virtcontainers/qemu.go index 94d6c96aac..d5e19d1f1d 100644 --- a/src/runtime/virtcontainers/qemu.go +++ b/src/runtime/virtcontainers/qemu.go @@ -1615,6 +1615,7 @@ func (q *qemu) hotplugDevice(ctx context.Context, devInfo interface{}, devType d func (q *qemu) hotplugAddDevice(ctx context.Context, devInfo interface{}, devType deviceType) (interface{}, error) { span, ctx := q.trace(ctx, "hotplugAddDevice") defer span.End() + span.SetAttributes(otelLabel.Any("device", devInfo)) data, err := q.hotplugDevice(ctx, devInfo, devType, addDevice) if err != nil { @@ -1627,6 +1628,7 @@ func (q *qemu) hotplugAddDevice(ctx context.Context, devInfo interface{}, devTyp func (q *qemu) hotplugRemoveDevice(ctx context.Context, devInfo interface{}, devType deviceType) (interface{}, error) { span, ctx := q.trace(ctx, "hotplugRemoveDevice") defer span.End() + span.SetAttributes(otelLabel.Any("device", devInfo)) data, err := q.hotplugDevice(ctx, devInfo, devType, removeDevice) if err != nil { @@ -1855,6 +1857,7 @@ func (q *qemu) addDevice(ctx context.Context, devInfo interface{}, devType devic var err error span, _ := q.trace(ctx, "addDevice") defer span.End() + span.SetAttributes(otelLabel.Any("device", devInfo)) switch v := devInfo.(type) { case types.Volume: diff --git a/src/runtime/virtcontainers/vm.go b/src/runtime/virtcontainers/vm.go index e6f02b6e07..556d0e408e 100644 --- a/src/runtime/virtcontainers/vm.go +++ b/src/runtime/virtcontainers/vm.go @@ -191,7 +191,7 @@ func NewVMFromGrpc(ctx context.Context, v *pb.GrpcVM, config VMConfig) (*VM, err // create agent instance newAagentFunc := getNewAgentFunc(ctx) agent := newAagentFunc() - agent.configureFromGrpc(hypervisor, v.Id, config.AgentConfig) + agent.configureFromGrpc(ctx, hypervisor, v.Id, config.AgentConfig) return &VM{ id: v.Id,