diff --git a/src/trace-forwarder/Cargo.toml b/src/trace-forwarder/Cargo.toml new file mode 100644 index 000000000..f0340558f --- /dev/null +++ b/src/trace-forwarder/Cargo.toml @@ -0,0 +1,32 @@ +# Copyright (c) 2020 Intel Corporation +# +# SPDX-License-Identifier: Apache-2.0 +# + +[package] +name = "kata-trace-forwarder" +version = "0.0.1" +authors = ["James O. D. Hunt "] +edition = "2018" + +[dependencies] +clap = "2.33.0" +vsock = "0.1.5" +nix = "0.15.0" +libc = "0.2.66" +serde = { version = "1.0.106", features = ["derive"] } +bincode = "1.2.1" +byteorder = "1.3.4" +serde_json = "1.0.44" +anyhow = "1.0.31" +opentelemetry = { version = "0.5.0", features=["serialize"] } +opentelemetry-jaeger = "0.4.0" +tracing-opentelemetry = "0.4.0" +tracing = "0.1.14" +tracing-subscriber = "0.2.5" + +logging = { path = "../../pkg/logging" } +slog = "2.5.2" + +[dev-dependencies] +tempfile = "3.1.0" diff --git a/src/trace-forwarder/Makefile b/src/trace-forwarder/Makefile new file mode 100644 index 000000000..51705a2a8 --- /dev/null +++ b/src/trace-forwarder/Makefile @@ -0,0 +1,16 @@ +# Copyright (c) 2020 Intel Corporation +# +# SPDX-License-Identifier: Apache-2.0 +# + +default: build + +build: + cargo build -v + +clean: + cargo clean + +.PHONY: \ + build \ + clean diff --git a/src/trace-forwarder/README.md b/src/trace-forwarder/README.md new file mode 100644 index 000000000..562e0f4ff --- /dev/null +++ b/src/trace-forwarder/README.md @@ -0,0 +1,23 @@ +# Trace Forwarder + +* [Overview](#overview) +* [Full details](#full-details) + +## Overview + +The Kata Containers trace forwarder, `kata-trace-forwarder`, is a component +running on the host system which is used to support tracing the agent process +which runs inside the virtual machine. + +The trace forwarder, which must be started before the agent, listens over +VSOCK for trace data sent by the agent running inside the virtual machine. The +trace spans are exported to an OpenTelemetry collector (such as Jaeger) running by +default on the host. + +## Full details + +Run: + +``` +$ cargo run -- --help +``` diff --git a/src/trace-forwarder/src/handler.rs b/src/trace-forwarder/src/handler.rs new file mode 100644 index 000000000..ced7f7aec --- /dev/null +++ b/src/trace-forwarder/src/handler.rs @@ -0,0 +1,105 @@ +// Copyright (c) 2020 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +use byteorder::{ByteOrder, NetworkEndian}; +use opentelemetry::exporter::trace::SpanData; +use opentelemetry::exporter::trace::{ExportResult, SpanExporter}; +use slog::{debug, info, o, Logger}; +use std::io::{ErrorKind, Read}; +use std::net::Shutdown; +use std::sync::Arc; +use vsock::VsockStream; + +// The VSOCK "packet" protocol used comprises two elements: +// +// 1) The header (the number of bytes in the payload). +// 2) The payload bytes. +// +// This constant defines the number of bytes used to encode the header on the +// wire. In other words, the first 64-bits of the packet contain a number +// specifying how many bytes are in the remainder of the packet. +const HEADER_SIZE_BYTES: u64 = std::mem::size_of::() as u64; + +fn mk_io_err(msg: &str) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, msg.to_string()) +} + +pub fn handle_connection( + logger: Logger, + mut conn: VsockStream, + exporter: &dyn SpanExporter, +) -> Result<(), std::io::Error> { + let logger = logger.new(o!("subsystem" => "handler", + "connection" => format!("{:?}", conn))); + + debug!(logger, "handling connection"); + + handle_trace_data(logger.clone(), &mut conn, exporter) + .map_err(|e| mk_io_err(&format!("failed to handle data: {:}", e)))?; + + debug!(&logger, "handled data"); + + conn.shutdown(Shutdown::Read) + .map_err(|e| mk_io_err(&format!("shutdown failed: {:}", e)))?; + + debug!(&logger, "shutdown connection"); + + Ok(()) +} + +fn handle_trace_data( + logger: Logger, + reader: &mut dyn Read, + exporter: &dyn SpanExporter, +) -> Result<(), String> { + loop { + let mut header: [u8; HEADER_SIZE_BYTES as usize] = [0; HEADER_SIZE_BYTES as usize]; + + info!(logger, "waiting for traces"); + + match reader.read_exact(&mut header) { + Ok(_) => debug!(logger, "read header"), + Err(e) => { + if e.kind() == ErrorKind::UnexpectedEof { + info!(logger, "agent shut down"); + break; + } + + return Err(format!("failed to read header: {:}", e)); + } + }; + + let payload_len: u64 = NetworkEndian::read_u64(&header); + + let mut encoded_payload = Vec::with_capacity(payload_len as usize); + encoded_payload.resize(payload_len as usize, 0); + + reader + .read_exact(&mut encoded_payload) + .map_err(|e| format!("failed to read payload: {:}", e))?; + + debug!(logger, "read payload"); + + let span_data: SpanData = + bincode::deserialize(&encoded_payload[..]).expect("failed to deserialise payload"); + + debug!(logger, "deserialised payload"); + + let mut batch = Vec::>::new(); + + batch.push(Arc::new(span_data)); + + // Call low-level Jaeger exporter to send the trace span immediately. + let result = exporter.export(batch); + + if result != ExportResult::Success { + return Err(format!("failed to export trace spans: {:?}", result)); + } + + debug!(logger, "exported trace spans"); + } + + Ok(()) +} diff --git a/src/trace-forwarder/src/main.rs b/src/trace-forwarder/src/main.rs new file mode 100644 index 000000000..c6925e183 --- /dev/null +++ b/src/trace-forwarder/src/main.rs @@ -0,0 +1,231 @@ +// Copyright (c) 2020 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +#![warn(unused_extern_crates)] +use anyhow::{anyhow, Result}; +use clap::{crate_name, crate_version, App, Arg}; +use slog::{error, info, Logger}; +use std::env; +use std::io; +use std::process::exit; + +// Traces will be created using this program name +const DEFAULT_TRACE_NAME: &str = "kata-agent"; + +const VSOCK_CID_ANY: &str = "any"; +const ABOUT_TEXT: &str = "Kata Containers Trace Forwarder"; + +const DESCRIPTION_TEXT: &str = r#" +DESCRIPTION: + Kata Containers component that runs on the host and forwards + trace data from the container to a trace collector on the host."#; + +const DEFAULT_LOG_LEVEL: slog::Level = slog::Level::Info; + +// VSOCK port this program listens to for trace data, sent by the agent. +// +// Must match the number used by the agent +const DEFAULT_KATA_VSOCK_TRACING_PORT: &str = "10240"; + +const DEFAULT_JAEGER_HOST: &str = "127.0.0.1"; +const DEFAULT_JAEGER_PORT: &str = "6831"; + +mod handler; +mod server; +mod tracer; + +fn announce(logger: &Logger, version: &str) { + let commit = env::var("VERSION_COMMIT").map_or(String::new(), |s| s); + + info!(logger, "announce"; + "commit-version" => commit.as_str(), + "version" => version); +} + +fn make_examples_text(program_name: &str) -> String { + format!( + r#"EXAMPLES: + +- Normally run on host specifying VSOCK port number + for Kata Containers agent to connect to: + + $ {program} --trace-name {trace_name:?} -p 12345 + + "#, + program = program_name, + trace_name = DEFAULT_TRACE_NAME, + ) +} + +fn real_main() -> Result<()> { + let version = crate_version!(); + let name = crate_name!(); + + let args = App::new(name) + .version(version) + .version_short("v") + .about(ABOUT_TEXT) + .long_about(DESCRIPTION_TEXT) + .after_help(&*make_examples_text(name)) + .arg( + Arg::with_name("trace-name") + .long("trace-name") + .help("Specify name for traces") + .required(false) + .takes_value(true) + .default_value(DEFAULT_TRACE_NAME), + ) + .arg( + Arg::with_name("jaeger-host") + .long("jaeger-host") + .help("Jaeger host address") + .takes_value(true) + .default_value(DEFAULT_JAEGER_HOST), + ) + .arg( + Arg::with_name("jaeger-port") + .long("jaeger-port") + .help("Jaeger port number") + .takes_value(true) + .default_value(DEFAULT_JAEGER_PORT), + ) + .arg( + Arg::with_name("log-level") + .long("log-level") + .short("l") + .help("specific log level") + .default_value(logging::slog_level_to_level_name(DEFAULT_LOG_LEVEL).unwrap()) + .possible_values(&logging::get_log_levels()) + .takes_value(true) + .required(false), + ) + .arg( + Arg::with_name("vsock-cid") + .long("vsock-cid") + .help(&format!("VSOCK CID number (or {:?})", VSOCK_CID_ANY)) + .takes_value(true) + .required(false) + .default_value(VSOCK_CID_ANY), + ) + .arg( + Arg::with_name("vsock-port") + .long("vsock-port") + .help("VSOCK port number") + .takes_value(true) + .default_value(DEFAULT_KATA_VSOCK_TRACING_PORT), + ) + .get_matches(); + + let vsock_port: u32 = args + .value_of("vsock-port") + .ok_or(anyhow!("Need VSOCK port number")) + .map_or_else( + |e| Err(anyhow!(e)), + |p| { + p.parse::() + .map_err(|e| anyhow!(format!("VSOCK port number must be an integer: {:?}", e))) + }, + )?; + + if vsock_port == 0 { + return Err(anyhow!("VSOCK port number cannot be zero")); + } + + let vsock_cid: u32 = args + .value_of("vsock-cid") + .ok_or(libc::VMADDR_CID_ANY as u32) + .map_or_else( + |e| Err(anyhow!(e)), + |c| { + if c == VSOCK_CID_ANY { + // Explicit request for "any CID" + Ok(libc::VMADDR_CID_ANY as u32) + } else { + c.parse::() + .map_err(|e| anyhow!(format!("CID number must be an integer: {:?}", e))) + } + }, + ) + .map_err(|e| anyhow!(e))?; + + if vsock_cid == 0 { + return Err(anyhow!("VSOCK CID cannot be zero")); + } + + let jaeger_port: u32 = args + .value_of("jaeger-port") + .ok_or("Need Jaeger port number") + .map(|p| p.parse::().unwrap()) + .map_err(|e| anyhow!("Jaeger port number must be an integer: {:?}", e))?; + + if jaeger_port == 0 { + return Err(anyhow!("Jaeger port number cannot be zero")); + } + + let jaeger_host = args + .value_of("jaeger-host") + .ok_or("Need Jaeger host") + .map_err(|e| anyhow!(e))?; + + if jaeger_host == "" { + return Err(anyhow!("Jaeger host cannot be blank")); + } + + // Cannot fail as a default has been specified + let log_level_name = args.value_of("log-level").unwrap(); + + let log_level = logging::level_name_to_slog_level(log_level_name).map_err(|e| anyhow!(e))?; + + // Setup logger + let writer = io::stdout(); + let logger = logging::create_logger(name, name, log_level, writer); + + announce(&logger, version); + + let trace_name: &str = args + .value_of("trace-name") + .ok_or(anyhow!("BUG: trace name not set")) + .map_or_else( + |e| Err(anyhow!(e)), + |n| { + if n == "" { + Err(anyhow!("Need non-blank trace name")) + } else { + Ok(n) + } + }, + )?; + + let mut server = server::VsockTraceServer::new( + &logger, + vsock_port, + vsock_cid, + jaeger_host, + jaeger_port, + trace_name, + ); + + let result = server.start(); + + if result.is_err() { + error!(logger, "failed"; "error" => format!("{:?}", result.err())); + } else { + info!(logger, "success"); + } + + Ok(()) +} + +fn main() { + match real_main() { + Err(e) => { + eprintln!("ERROR: {}", e); + exit(1); + } + _ => (), + }; + + exit(0); +} diff --git a/src/trace-forwarder/src/server.rs b/src/trace-forwarder/src/server.rs new file mode 100644 index 000000000..7f296c005 --- /dev/null +++ b/src/trace-forwarder/src/server.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2020 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +use crate::handler; +use nix::sys::socket::{SockAddr, VsockAddr}; +use slog::{debug, error, info, o, Logger}; +use std::io; +use vsock::VsockListener; + +use crate::tracer; + +#[derive(Debug)] +pub struct VsockTraceServer { + pub vsock_port: u32, + pub vsock_cid: u32, + + pub jaeger_host: String, + pub jaeger_port: u32, + pub jaeger_service_name: String, + + pub logger: Logger, +} + +impl VsockTraceServer { + pub fn new( + logger: &Logger, + vsock_port: u32, + vsock_cid: u32, + jaeger_host: &str, + jaeger_port: u32, + jaeger_service_name: &str, + ) -> Self { + let logger = logger.new(o!("subsystem" => "server")); + + VsockTraceServer { + vsock_port: vsock_port, + vsock_cid: vsock_cid, + jaeger_host: jaeger_host.to_string(), + jaeger_port: jaeger_port, + jaeger_service_name: jaeger_service_name.to_string(), + logger: logger, + } + } + + pub fn start(&mut self) -> Result<(), io::Error> { + let vsock_addr = VsockAddr::new(self.vsock_cid, self.vsock_port); + let sock_addr = SockAddr::Vsock(vsock_addr); + + let listener = VsockListener::bind(&sock_addr)?; + + info!(self.logger, "listening for client connections"; "vsock-port" => self.vsock_port, "vsock-cid" => self.vsock_cid); + + let result = tracer::create_jaeger_trace_exporter( + self.jaeger_service_name.clone(), + self.jaeger_host.clone(), + self.jaeger_port, + ); + + let exporter = result?; + + for conn in listener.incoming() { + debug!(self.logger, "got client connection"); + + match conn { + Err(e) => { + error!(self.logger, "client connection failed"; "error" => format!("{}", e)) + } + Ok(conn) => { + debug!(self.logger, "client connection successful"); + + let logger = self.logger.new(o!()); + + handler::handle_connection(logger, conn, &exporter)?; + } + } + + debug!(self.logger, "handled client connection"); + } + + Ok(()) + } +} diff --git a/src/trace-forwarder/src/tracer.rs b/src/trace-forwarder/src/tracer.rs new file mode 100644 index 000000000..dd6ffd538 --- /dev/null +++ b/src/trace-forwarder/src/tracer.rs @@ -0,0 +1,48 @@ +// Copyright (c) 2020 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +use opentelemetry::api::Key; +use std::net::SocketAddr; + +pub fn create_jaeger_trace_exporter( + jaeger_service_name: String, + jaeger_host: String, + jaeger_port: u32, +) -> Result { + let exporter_type = "jaeger"; + + let jaeger_addr = format!("{}:{}", jaeger_host, jaeger_port); + + let process = opentelemetry_jaeger::Process { + service_name: jaeger_service_name, + tags: vec![Key::new("exporter").string(exporter_type)], + }; + + let socket_addr: SocketAddr = match jaeger_addr.parse() { + Ok(a) => a, + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("failed to parse Jaeger address: {:?}", e.to_string()), + )) + } + }; + + let exporter = match opentelemetry_jaeger::Exporter::builder() + .with_agent_endpoint(socket_addr.to_string()) + .with_process(process) + .init() + { + Ok(x) => x, + Err(e) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("failed to create exporter: {:?}", e.to_string()), + )) + } + }; + + Ok(exporter) +}