src: Add trace forwarder component

Add a new system component, used only when tracing is enabled. The
component listens to the agent over VSOCK, forwarding trace spans
created by the agent in the virtual machine onwards to an OpenTelemetry
collector (such as Jaeger) running on the host.

Fixes: #224.

Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
James O. D. Hunt 2020-06-04 14:17:12 +01:00
parent e5f5bc2278
commit 9b987c17d9
7 changed files with 539 additions and 0 deletions

View File

@ -0,0 +1,32 @@
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
#
[package]
name = "kata-trace-forwarder"
version = "0.0.1"
authors = ["James O. D. Hunt <james.o.hunt@intel.com>"]
edition = "2018"
[dependencies]
clap = "2.33.0"
vsock = "0.1.5"
nix = "0.15.0"
libc = "0.2.66"
serde = { version = "1.0.106", features = ["derive"] }
bincode = "1.2.1"
byteorder = "1.3.4"
serde_json = "1.0.44"
anyhow = "1.0.31"
opentelemetry = { version = "0.5.0", features=["serialize"] }
opentelemetry-jaeger = "0.4.0"
tracing-opentelemetry = "0.4.0"
tracing = "0.1.14"
tracing-subscriber = "0.2.5"
logging = { path = "../../pkg/logging" }
slog = "2.5.2"
[dev-dependencies]
tempfile = "3.1.0"

View File

@ -0,0 +1,16 @@
# Copyright (c) 2020 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
#
default: build
build:
cargo build -v
clean:
cargo clean
.PHONY: \
build \
clean

View File

@ -0,0 +1,23 @@
# Trace Forwarder
* [Overview](#overview)
* [Full details](#full-details)
## Overview
The Kata Containers trace forwarder, `kata-trace-forwarder`, is a component
running on the host system which is used to support tracing the agent process
which runs inside the virtual machine.
The trace forwarder, which must be started before the agent, listens over
VSOCK for trace data sent by the agent running inside the virtual machine. The
trace spans are exported to an OpenTelemetry collector (such as Jaeger) running by
default on the host.
## Full details
Run:
```
$ cargo run -- --help
```

View File

@ -0,0 +1,105 @@
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
use byteorder::{ByteOrder, NetworkEndian};
use opentelemetry::exporter::trace::SpanData;
use opentelemetry::exporter::trace::{ExportResult, SpanExporter};
use slog::{debug, info, o, Logger};
use std::io::{ErrorKind, Read};
use std::net::Shutdown;
use std::sync::Arc;
use vsock::VsockStream;
// The VSOCK "packet" protocol used comprises two elements:
//
// 1) The header (the number of bytes in the payload).
// 2) The payload bytes.
//
// This constant defines the number of bytes used to encode the header on the
// wire. In other words, the first 64-bits of the packet contain a number
// specifying how many bytes are in the remainder of the packet.
const HEADER_SIZE_BYTES: u64 = std::mem::size_of::<u64>() as u64;
fn mk_io_err(msg: &str) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, msg.to_string())
}
pub fn handle_connection(
logger: Logger,
mut conn: VsockStream,
exporter: &dyn SpanExporter,
) -> Result<(), std::io::Error> {
let logger = logger.new(o!("subsystem" => "handler",
"connection" => format!("{:?}", conn)));
debug!(logger, "handling connection");
handle_trace_data(logger.clone(), &mut conn, exporter)
.map_err(|e| mk_io_err(&format!("failed to handle data: {:}", e)))?;
debug!(&logger, "handled data");
conn.shutdown(Shutdown::Read)
.map_err(|e| mk_io_err(&format!("shutdown failed: {:}", e)))?;
debug!(&logger, "shutdown connection");
Ok(())
}
fn handle_trace_data(
logger: Logger,
reader: &mut dyn Read,
exporter: &dyn SpanExporter,
) -> Result<(), String> {
loop {
let mut header: [u8; HEADER_SIZE_BYTES as usize] = [0; HEADER_SIZE_BYTES as usize];
info!(logger, "waiting for traces");
match reader.read_exact(&mut header) {
Ok(_) => debug!(logger, "read header"),
Err(e) => {
if e.kind() == ErrorKind::UnexpectedEof {
info!(logger, "agent shut down");
break;
}
return Err(format!("failed to read header: {:}", e));
}
};
let payload_len: u64 = NetworkEndian::read_u64(&header);
let mut encoded_payload = Vec::with_capacity(payload_len as usize);
encoded_payload.resize(payload_len as usize, 0);
reader
.read_exact(&mut encoded_payload)
.map_err(|e| format!("failed to read payload: {:}", e))?;
debug!(logger, "read payload");
let span_data: SpanData =
bincode::deserialize(&encoded_payload[..]).expect("failed to deserialise payload");
debug!(logger, "deserialised payload");
let mut batch = Vec::<Arc<SpanData>>::new();
batch.push(Arc::new(span_data));
// Call low-level Jaeger exporter to send the trace span immediately.
let result = exporter.export(batch);
if result != ExportResult::Success {
return Err(format!("failed to export trace spans: {:?}", result));
}
debug!(logger, "exported trace spans");
}
Ok(())
}

View File

@ -0,0 +1,231 @@
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
#![warn(unused_extern_crates)]
use anyhow::{anyhow, Result};
use clap::{crate_name, crate_version, App, Arg};
use slog::{error, info, Logger};
use std::env;
use std::io;
use std::process::exit;
// Traces will be created using this program name
const DEFAULT_TRACE_NAME: &str = "kata-agent";
const VSOCK_CID_ANY: &str = "any";
const ABOUT_TEXT: &str = "Kata Containers Trace Forwarder";
const DESCRIPTION_TEXT: &str = r#"
DESCRIPTION:
Kata Containers component that runs on the host and forwards
trace data from the container to a trace collector on the host."#;
const DEFAULT_LOG_LEVEL: slog::Level = slog::Level::Info;
// VSOCK port this program listens to for trace data, sent by the agent.
//
// Must match the number used by the agent
const DEFAULT_KATA_VSOCK_TRACING_PORT: &str = "10240";
const DEFAULT_JAEGER_HOST: &str = "127.0.0.1";
const DEFAULT_JAEGER_PORT: &str = "6831";
mod handler;
mod server;
mod tracer;
fn announce(logger: &Logger, version: &str) {
let commit = env::var("VERSION_COMMIT").map_or(String::new(), |s| s);
info!(logger, "announce";
"commit-version" => commit.as_str(),
"version" => version);
}
fn make_examples_text(program_name: &str) -> String {
format!(
r#"EXAMPLES:
- Normally run on host specifying VSOCK port number
for Kata Containers agent to connect to:
$ {program} --trace-name {trace_name:?} -p 12345
"#,
program = program_name,
trace_name = DEFAULT_TRACE_NAME,
)
}
fn real_main() -> Result<()> {
let version = crate_version!();
let name = crate_name!();
let args = App::new(name)
.version(version)
.version_short("v")
.about(ABOUT_TEXT)
.long_about(DESCRIPTION_TEXT)
.after_help(&*make_examples_text(name))
.arg(
Arg::with_name("trace-name")
.long("trace-name")
.help("Specify name for traces")
.required(false)
.takes_value(true)
.default_value(DEFAULT_TRACE_NAME),
)
.arg(
Arg::with_name("jaeger-host")
.long("jaeger-host")
.help("Jaeger host address")
.takes_value(true)
.default_value(DEFAULT_JAEGER_HOST),
)
.arg(
Arg::with_name("jaeger-port")
.long("jaeger-port")
.help("Jaeger port number")
.takes_value(true)
.default_value(DEFAULT_JAEGER_PORT),
)
.arg(
Arg::with_name("log-level")
.long("log-level")
.short("l")
.help("specific log level")
.default_value(logging::slog_level_to_level_name(DEFAULT_LOG_LEVEL).unwrap())
.possible_values(&logging::get_log_levels())
.takes_value(true)
.required(false),
)
.arg(
Arg::with_name("vsock-cid")
.long("vsock-cid")
.help(&format!("VSOCK CID number (or {:?})", VSOCK_CID_ANY))
.takes_value(true)
.required(false)
.default_value(VSOCK_CID_ANY),
)
.arg(
Arg::with_name("vsock-port")
.long("vsock-port")
.help("VSOCK port number")
.takes_value(true)
.default_value(DEFAULT_KATA_VSOCK_TRACING_PORT),
)
.get_matches();
let vsock_port: u32 = args
.value_of("vsock-port")
.ok_or(anyhow!("Need VSOCK port number"))
.map_or_else(
|e| Err(anyhow!(e)),
|p| {
p.parse::<u32>()
.map_err(|e| anyhow!(format!("VSOCK port number must be an integer: {:?}", e)))
},
)?;
if vsock_port == 0 {
return Err(anyhow!("VSOCK port number cannot be zero"));
}
let vsock_cid: u32 = args
.value_of("vsock-cid")
.ok_or(libc::VMADDR_CID_ANY as u32)
.map_or_else(
|e| Err(anyhow!(e)),
|c| {
if c == VSOCK_CID_ANY {
// Explicit request for "any CID"
Ok(libc::VMADDR_CID_ANY as u32)
} else {
c.parse::<u32>()
.map_err(|e| anyhow!(format!("CID number must be an integer: {:?}", e)))
}
},
)
.map_err(|e| anyhow!(e))?;
if vsock_cid == 0 {
return Err(anyhow!("VSOCK CID cannot be zero"));
}
let jaeger_port: u32 = args
.value_of("jaeger-port")
.ok_or("Need Jaeger port number")
.map(|p| p.parse::<u32>().unwrap())
.map_err(|e| anyhow!("Jaeger port number must be an integer: {:?}", e))?;
if jaeger_port == 0 {
return Err(anyhow!("Jaeger port number cannot be zero"));
}
let jaeger_host = args
.value_of("jaeger-host")
.ok_or("Need Jaeger host")
.map_err(|e| anyhow!(e))?;
if jaeger_host == "" {
return Err(anyhow!("Jaeger host cannot be blank"));
}
// Cannot fail as a default has been specified
let log_level_name = args.value_of("log-level").unwrap();
let log_level = logging::level_name_to_slog_level(log_level_name).map_err(|e| anyhow!(e))?;
// Setup logger
let writer = io::stdout();
let logger = logging::create_logger(name, name, log_level, writer);
announce(&logger, version);
let trace_name: &str = args
.value_of("trace-name")
.ok_or(anyhow!("BUG: trace name not set"))
.map_or_else(
|e| Err(anyhow!(e)),
|n| {
if n == "" {
Err(anyhow!("Need non-blank trace name"))
} else {
Ok(n)
}
},
)?;
let mut server = server::VsockTraceServer::new(
&logger,
vsock_port,
vsock_cid,
jaeger_host,
jaeger_port,
trace_name,
);
let result = server.start();
if result.is_err() {
error!(logger, "failed"; "error" => format!("{:?}", result.err()));
} else {
info!(logger, "success");
}
Ok(())
}
fn main() {
match real_main() {
Err(e) => {
eprintln!("ERROR: {}", e);
exit(1);
}
_ => (),
};
exit(0);
}

View File

@ -0,0 +1,84 @@
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
use crate::handler;
use nix::sys::socket::{SockAddr, VsockAddr};
use slog::{debug, error, info, o, Logger};
use std::io;
use vsock::VsockListener;
use crate::tracer;
#[derive(Debug)]
pub struct VsockTraceServer {
pub vsock_port: u32,
pub vsock_cid: u32,
pub jaeger_host: String,
pub jaeger_port: u32,
pub jaeger_service_name: String,
pub logger: Logger,
}
impl VsockTraceServer {
pub fn new(
logger: &Logger,
vsock_port: u32,
vsock_cid: u32,
jaeger_host: &str,
jaeger_port: u32,
jaeger_service_name: &str,
) -> Self {
let logger = logger.new(o!("subsystem" => "server"));
VsockTraceServer {
vsock_port: vsock_port,
vsock_cid: vsock_cid,
jaeger_host: jaeger_host.to_string(),
jaeger_port: jaeger_port,
jaeger_service_name: jaeger_service_name.to_string(),
logger: logger,
}
}
pub fn start(&mut self) -> Result<(), io::Error> {
let vsock_addr = VsockAddr::new(self.vsock_cid, self.vsock_port);
let sock_addr = SockAddr::Vsock(vsock_addr);
let listener = VsockListener::bind(&sock_addr)?;
info!(self.logger, "listening for client connections"; "vsock-port" => self.vsock_port, "vsock-cid" => self.vsock_cid);
let result = tracer::create_jaeger_trace_exporter(
self.jaeger_service_name.clone(),
self.jaeger_host.clone(),
self.jaeger_port,
);
let exporter = result?;
for conn in listener.incoming() {
debug!(self.logger, "got client connection");
match conn {
Err(e) => {
error!(self.logger, "client connection failed"; "error" => format!("{}", e))
}
Ok(conn) => {
debug!(self.logger, "client connection successful");
let logger = self.logger.new(o!());
handler::handle_connection(logger, conn, &exporter)?;
}
}
debug!(self.logger, "handled client connection");
}
Ok(())
}
}

View File

@ -0,0 +1,48 @@
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
use opentelemetry::api::Key;
use std::net::SocketAddr;
pub fn create_jaeger_trace_exporter(
jaeger_service_name: String,
jaeger_host: String,
jaeger_port: u32,
) -> Result<opentelemetry_jaeger::Exporter, std::io::Error> {
let exporter_type = "jaeger";
let jaeger_addr = format!("{}:{}", jaeger_host, jaeger_port);
let process = opentelemetry_jaeger::Process {
service_name: jaeger_service_name,
tags: vec![Key::new("exporter").string(exporter_type)],
};
let socket_addr: SocketAddr = match jaeger_addr.parse() {
Ok(a) => a,
Err(e) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to parse Jaeger address: {:?}", e.to_string()),
))
}
};
let exporter = match opentelemetry_jaeger::Exporter::builder()
.with_agent_endpoint(socket_addr.to_string())
.with_process(process)
.init()
{
Ok(x) => x,
Err(e) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to create exporter: {:?}", e.to_string()),
))
}
};
Ok(exporter)
}