mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-09-18 15:28:10 +00:00
Merge pull request #225 from jodh-intel/2.0-dev-trace-forwarder
src: Add trace forwarder component
This commit is contained in:
@@ -95,6 +95,11 @@ inside the virtual machine). This shim is required to be compliant with the
|
||||
expectations of the [OCI runtime
|
||||
specification](https://github.com/opencontainers/runtime-spec).
|
||||
|
||||
##### Trace forwarder
|
||||
|
||||
The [`kata-trace-forwarder`](src/trace-forwarder) is a component only used
|
||||
when tracing the [agent](#agent) process.
|
||||
|
||||
#### Additional
|
||||
|
||||
##### Hypervisor
|
||||
|
32
src/trace-forwarder/Cargo.toml
Normal file
32
src/trace-forwarder/Cargo.toml
Normal file
@@ -0,0 +1,32 @@
|
||||
# Copyright (c) 2020 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
[package]
|
||||
name = "kata-trace-forwarder"
|
||||
version = "0.0.1"
|
||||
authors = ["James O. D. Hunt <james.o.hunt@intel.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
clap = "2.33.0"
|
||||
vsock = "0.1.5"
|
||||
nix = "0.15.0"
|
||||
libc = "0.2.66"
|
||||
serde = { version = "1.0.106", features = ["derive"] }
|
||||
bincode = "1.2.1"
|
||||
byteorder = "1.3.4"
|
||||
serde_json = "1.0.44"
|
||||
anyhow = "1.0.31"
|
||||
opentelemetry = { version = "0.5.0", features=["serialize"] }
|
||||
opentelemetry-jaeger = "0.4.0"
|
||||
tracing-opentelemetry = "0.4.0"
|
||||
tracing = "0.1.14"
|
||||
tracing-subscriber = "0.2.5"
|
||||
|
||||
logging = { path = "../../pkg/logging" }
|
||||
slog = "2.5.2"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.1.0"
|
16
src/trace-forwarder/Makefile
Normal file
16
src/trace-forwarder/Makefile
Normal file
@@ -0,0 +1,16 @@
|
||||
# Copyright (c) 2020 Intel Corporation
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
default: build
|
||||
|
||||
build:
|
||||
cargo build -v
|
||||
|
||||
clean:
|
||||
cargo clean
|
||||
|
||||
.PHONY: \
|
||||
build \
|
||||
clean
|
23
src/trace-forwarder/README.md
Normal file
23
src/trace-forwarder/README.md
Normal file
@@ -0,0 +1,23 @@
|
||||
# Trace Forwarder
|
||||
|
||||
* [Overview](#overview)
|
||||
* [Full details](#full-details)
|
||||
|
||||
## Overview
|
||||
|
||||
The Kata Containers trace forwarder, `kata-trace-forwarder`, is a component
|
||||
running on the host system which is used to support tracing the agent process
|
||||
which runs inside the virtual machine.
|
||||
|
||||
The trace forwarder, which must be started before the agent, listens over
|
||||
VSOCK for trace data sent by the agent running inside the virtual machine. The
|
||||
trace spans are exported to an OpenTelemetry collector (such as Jaeger) running by
|
||||
default on the host.
|
||||
|
||||
## Full details
|
||||
|
||||
Run:
|
||||
|
||||
```
|
||||
$ cargo run -- --help
|
||||
```
|
105
src/trace-forwarder/src/handler.rs
Normal file
105
src/trace-forwarder/src/handler.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
// Copyright (c) 2020 Intel Corporation
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use byteorder::{ByteOrder, NetworkEndian};
|
||||
use opentelemetry::exporter::trace::SpanData;
|
||||
use opentelemetry::exporter::trace::{ExportResult, SpanExporter};
|
||||
use slog::{debug, info, o, Logger};
|
||||
use std::io::{ErrorKind, Read};
|
||||
use std::net::Shutdown;
|
||||
use std::sync::Arc;
|
||||
use vsock::VsockStream;
|
||||
|
||||
// The VSOCK "packet" protocol used comprises two elements:
|
||||
//
|
||||
// 1) The header (the number of bytes in the payload).
|
||||
// 2) The payload bytes.
|
||||
//
|
||||
// This constant defines the number of bytes used to encode the header on the
|
||||
// wire. In other words, the first 64-bits of the packet contain a number
|
||||
// specifying how many bytes are in the remainder of the packet.
|
||||
const HEADER_SIZE_BYTES: u64 = std::mem::size_of::<u64>() as u64;
|
||||
|
||||
fn mk_io_err(msg: &str) -> std::io::Error {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, msg.to_string())
|
||||
}
|
||||
|
||||
pub fn handle_connection(
|
||||
logger: Logger,
|
||||
mut conn: VsockStream,
|
||||
exporter: &dyn SpanExporter,
|
||||
) -> Result<(), std::io::Error> {
|
||||
let logger = logger.new(o!("subsystem" => "handler",
|
||||
"connection" => format!("{:?}", conn)));
|
||||
|
||||
debug!(logger, "handling connection");
|
||||
|
||||
handle_trace_data(logger.clone(), &mut conn, exporter)
|
||||
.map_err(|e| mk_io_err(&format!("failed to handle data: {:}", e)))?;
|
||||
|
||||
debug!(&logger, "handled data");
|
||||
|
||||
conn.shutdown(Shutdown::Read)
|
||||
.map_err(|e| mk_io_err(&format!("shutdown failed: {:}", e)))?;
|
||||
|
||||
debug!(&logger, "shutdown connection");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_trace_data(
|
||||
logger: Logger,
|
||||
reader: &mut dyn Read,
|
||||
exporter: &dyn SpanExporter,
|
||||
) -> Result<(), String> {
|
||||
loop {
|
||||
let mut header: [u8; HEADER_SIZE_BYTES as usize] = [0; HEADER_SIZE_BYTES as usize];
|
||||
|
||||
info!(logger, "waiting for traces");
|
||||
|
||||
match reader.read_exact(&mut header) {
|
||||
Ok(_) => debug!(logger, "read header"),
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::UnexpectedEof {
|
||||
info!(logger, "agent shut down");
|
||||
break;
|
||||
}
|
||||
|
||||
return Err(format!("failed to read header: {:}", e));
|
||||
}
|
||||
};
|
||||
|
||||
let payload_len: u64 = NetworkEndian::read_u64(&header);
|
||||
|
||||
let mut encoded_payload = Vec::with_capacity(payload_len as usize);
|
||||
encoded_payload.resize(payload_len as usize, 0);
|
||||
|
||||
reader
|
||||
.read_exact(&mut encoded_payload)
|
||||
.map_err(|e| format!("failed to read payload: {:}", e))?;
|
||||
|
||||
debug!(logger, "read payload");
|
||||
|
||||
let span_data: SpanData =
|
||||
bincode::deserialize(&encoded_payload[..]).expect("failed to deserialise payload");
|
||||
|
||||
debug!(logger, "deserialised payload");
|
||||
|
||||
let mut batch = Vec::<Arc<SpanData>>::new();
|
||||
|
||||
batch.push(Arc::new(span_data));
|
||||
|
||||
// Call low-level Jaeger exporter to send the trace span immediately.
|
||||
let result = exporter.export(batch);
|
||||
|
||||
if result != ExportResult::Success {
|
||||
return Err(format!("failed to export trace spans: {:?}", result));
|
||||
}
|
||||
|
||||
debug!(logger, "exported trace spans");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
231
src/trace-forwarder/src/main.rs
Normal file
231
src/trace-forwarder/src/main.rs
Normal file
@@ -0,0 +1,231 @@
|
||||
// Copyright (c) 2020 Intel Corporation
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
#![warn(unused_extern_crates)]
|
||||
use anyhow::{anyhow, Result};
|
||||
use clap::{crate_name, crate_version, App, Arg};
|
||||
use slog::{error, info, Logger};
|
||||
use std::env;
|
||||
use std::io;
|
||||
use std::process::exit;
|
||||
|
||||
// Traces will be created using this program name
|
||||
const DEFAULT_TRACE_NAME: &str = "kata-agent";
|
||||
|
||||
const VSOCK_CID_ANY: &str = "any";
|
||||
const ABOUT_TEXT: &str = "Kata Containers Trace Forwarder";
|
||||
|
||||
const DESCRIPTION_TEXT: &str = r#"
|
||||
DESCRIPTION:
|
||||
Kata Containers component that runs on the host and forwards
|
||||
trace data from the container to a trace collector on the host."#;
|
||||
|
||||
const DEFAULT_LOG_LEVEL: slog::Level = slog::Level::Info;
|
||||
|
||||
// VSOCK port this program listens to for trace data, sent by the agent.
|
||||
//
|
||||
// Must match the number used by the agent
|
||||
const DEFAULT_KATA_VSOCK_TRACING_PORT: &str = "10240";
|
||||
|
||||
const DEFAULT_JAEGER_HOST: &str = "127.0.0.1";
|
||||
const DEFAULT_JAEGER_PORT: &str = "6831";
|
||||
|
||||
mod handler;
|
||||
mod server;
|
||||
mod tracer;
|
||||
|
||||
fn announce(logger: &Logger, version: &str) {
|
||||
let commit = env::var("VERSION_COMMIT").map_or(String::new(), |s| s);
|
||||
|
||||
info!(logger, "announce";
|
||||
"commit-version" => commit.as_str(),
|
||||
"version" => version);
|
||||
}
|
||||
|
||||
fn make_examples_text(program_name: &str) -> String {
|
||||
format!(
|
||||
r#"EXAMPLES:
|
||||
|
||||
- Normally run on host specifying VSOCK port number
|
||||
for Kata Containers agent to connect to:
|
||||
|
||||
$ {program} --trace-name {trace_name:?} -p 12345
|
||||
|
||||
"#,
|
||||
program = program_name,
|
||||
trace_name = DEFAULT_TRACE_NAME,
|
||||
)
|
||||
}
|
||||
|
||||
fn real_main() -> Result<()> {
|
||||
let version = crate_version!();
|
||||
let name = crate_name!();
|
||||
|
||||
let args = App::new(name)
|
||||
.version(version)
|
||||
.version_short("v")
|
||||
.about(ABOUT_TEXT)
|
||||
.long_about(DESCRIPTION_TEXT)
|
||||
.after_help(&*make_examples_text(name))
|
||||
.arg(
|
||||
Arg::with_name("trace-name")
|
||||
.long("trace-name")
|
||||
.help("Specify name for traces")
|
||||
.required(false)
|
||||
.takes_value(true)
|
||||
.default_value(DEFAULT_TRACE_NAME),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("jaeger-host")
|
||||
.long("jaeger-host")
|
||||
.help("Jaeger host address")
|
||||
.takes_value(true)
|
||||
.default_value(DEFAULT_JAEGER_HOST),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("jaeger-port")
|
||||
.long("jaeger-port")
|
||||
.help("Jaeger port number")
|
||||
.takes_value(true)
|
||||
.default_value(DEFAULT_JAEGER_PORT),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("log-level")
|
||||
.long("log-level")
|
||||
.short("l")
|
||||
.help("specific log level")
|
||||
.default_value(logging::slog_level_to_level_name(DEFAULT_LOG_LEVEL).unwrap())
|
||||
.possible_values(&logging::get_log_levels())
|
||||
.takes_value(true)
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("vsock-cid")
|
||||
.long("vsock-cid")
|
||||
.help(&format!("VSOCK CID number (or {:?})", VSOCK_CID_ANY))
|
||||
.takes_value(true)
|
||||
.required(false)
|
||||
.default_value(VSOCK_CID_ANY),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("vsock-port")
|
||||
.long("vsock-port")
|
||||
.help("VSOCK port number")
|
||||
.takes_value(true)
|
||||
.default_value(DEFAULT_KATA_VSOCK_TRACING_PORT),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let vsock_port: u32 = args
|
||||
.value_of("vsock-port")
|
||||
.ok_or(anyhow!("Need VSOCK port number"))
|
||||
.map_or_else(
|
||||
|e| Err(anyhow!(e)),
|
||||
|p| {
|
||||
p.parse::<u32>()
|
||||
.map_err(|e| anyhow!(format!("VSOCK port number must be an integer: {:?}", e)))
|
||||
},
|
||||
)?;
|
||||
|
||||
if vsock_port == 0 {
|
||||
return Err(anyhow!("VSOCK port number cannot be zero"));
|
||||
}
|
||||
|
||||
let vsock_cid: u32 = args
|
||||
.value_of("vsock-cid")
|
||||
.ok_or(libc::VMADDR_CID_ANY as u32)
|
||||
.map_or_else(
|
||||
|e| Err(anyhow!(e)),
|
||||
|c| {
|
||||
if c == VSOCK_CID_ANY {
|
||||
// Explicit request for "any CID"
|
||||
Ok(libc::VMADDR_CID_ANY as u32)
|
||||
} else {
|
||||
c.parse::<u32>()
|
||||
.map_err(|e| anyhow!(format!("CID number must be an integer: {:?}", e)))
|
||||
}
|
||||
},
|
||||
)
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
|
||||
if vsock_cid == 0 {
|
||||
return Err(anyhow!("VSOCK CID cannot be zero"));
|
||||
}
|
||||
|
||||
let jaeger_port: u32 = args
|
||||
.value_of("jaeger-port")
|
||||
.ok_or("Need Jaeger port number")
|
||||
.map(|p| p.parse::<u32>().unwrap())
|
||||
.map_err(|e| anyhow!("Jaeger port number must be an integer: {:?}", e))?;
|
||||
|
||||
if jaeger_port == 0 {
|
||||
return Err(anyhow!("Jaeger port number cannot be zero"));
|
||||
}
|
||||
|
||||
let jaeger_host = args
|
||||
.value_of("jaeger-host")
|
||||
.ok_or("Need Jaeger host")
|
||||
.map_err(|e| anyhow!(e))?;
|
||||
|
||||
if jaeger_host == "" {
|
||||
return Err(anyhow!("Jaeger host cannot be blank"));
|
||||
}
|
||||
|
||||
// Cannot fail as a default has been specified
|
||||
let log_level_name = args.value_of("log-level").unwrap();
|
||||
|
||||
let log_level = logging::level_name_to_slog_level(log_level_name).map_err(|e| anyhow!(e))?;
|
||||
|
||||
// Setup logger
|
||||
let writer = io::stdout();
|
||||
let logger = logging::create_logger(name, name, log_level, writer);
|
||||
|
||||
announce(&logger, version);
|
||||
|
||||
let trace_name: &str = args
|
||||
.value_of("trace-name")
|
||||
.ok_or(anyhow!("BUG: trace name not set"))
|
||||
.map_or_else(
|
||||
|e| Err(anyhow!(e)),
|
||||
|n| {
|
||||
if n == "" {
|
||||
Err(anyhow!("Need non-blank trace name"))
|
||||
} else {
|
||||
Ok(n)
|
||||
}
|
||||
},
|
||||
)?;
|
||||
|
||||
let mut server = server::VsockTraceServer::new(
|
||||
&logger,
|
||||
vsock_port,
|
||||
vsock_cid,
|
||||
jaeger_host,
|
||||
jaeger_port,
|
||||
trace_name,
|
||||
);
|
||||
|
||||
let result = server.start();
|
||||
|
||||
if result.is_err() {
|
||||
error!(logger, "failed"; "error" => format!("{:?}", result.err()));
|
||||
} else {
|
||||
info!(logger, "success");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
match real_main() {
|
||||
Err(e) => {
|
||||
eprintln!("ERROR: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
|
||||
exit(0);
|
||||
}
|
84
src/trace-forwarder/src/server.rs
Normal file
84
src/trace-forwarder/src/server.rs
Normal file
@@ -0,0 +1,84 @@
|
||||
// Copyright (c) 2020 Intel Corporation
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use crate::handler;
|
||||
use nix::sys::socket::{SockAddr, VsockAddr};
|
||||
use slog::{debug, error, info, o, Logger};
|
||||
use std::io;
|
||||
use vsock::VsockListener;
|
||||
|
||||
use crate::tracer;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VsockTraceServer {
|
||||
pub vsock_port: u32,
|
||||
pub vsock_cid: u32,
|
||||
|
||||
pub jaeger_host: String,
|
||||
pub jaeger_port: u32,
|
||||
pub jaeger_service_name: String,
|
||||
|
||||
pub logger: Logger,
|
||||
}
|
||||
|
||||
impl VsockTraceServer {
|
||||
pub fn new(
|
||||
logger: &Logger,
|
||||
vsock_port: u32,
|
||||
vsock_cid: u32,
|
||||
jaeger_host: &str,
|
||||
jaeger_port: u32,
|
||||
jaeger_service_name: &str,
|
||||
) -> Self {
|
||||
let logger = logger.new(o!("subsystem" => "server"));
|
||||
|
||||
VsockTraceServer {
|
||||
vsock_port: vsock_port,
|
||||
vsock_cid: vsock_cid,
|
||||
jaeger_host: jaeger_host.to_string(),
|
||||
jaeger_port: jaeger_port,
|
||||
jaeger_service_name: jaeger_service_name.to_string(),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&mut self) -> Result<(), io::Error> {
|
||||
let vsock_addr = VsockAddr::new(self.vsock_cid, self.vsock_port);
|
||||
let sock_addr = SockAddr::Vsock(vsock_addr);
|
||||
|
||||
let listener = VsockListener::bind(&sock_addr)?;
|
||||
|
||||
info!(self.logger, "listening for client connections"; "vsock-port" => self.vsock_port, "vsock-cid" => self.vsock_cid);
|
||||
|
||||
let result = tracer::create_jaeger_trace_exporter(
|
||||
self.jaeger_service_name.clone(),
|
||||
self.jaeger_host.clone(),
|
||||
self.jaeger_port,
|
||||
);
|
||||
|
||||
let exporter = result?;
|
||||
|
||||
for conn in listener.incoming() {
|
||||
debug!(self.logger, "got client connection");
|
||||
|
||||
match conn {
|
||||
Err(e) => {
|
||||
error!(self.logger, "client connection failed"; "error" => format!("{}", e))
|
||||
}
|
||||
Ok(conn) => {
|
||||
debug!(self.logger, "client connection successful");
|
||||
|
||||
let logger = self.logger.new(o!());
|
||||
|
||||
handler::handle_connection(logger, conn, &exporter)?;
|
||||
}
|
||||
}
|
||||
|
||||
debug!(self.logger, "handled client connection");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
48
src/trace-forwarder/src/tracer.rs
Normal file
48
src/trace-forwarder/src/tracer.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
// Copyright (c) 2020 Intel Corporation
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use opentelemetry::api::Key;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
pub fn create_jaeger_trace_exporter(
|
||||
jaeger_service_name: String,
|
||||
jaeger_host: String,
|
||||
jaeger_port: u32,
|
||||
) -> Result<opentelemetry_jaeger::Exporter, std::io::Error> {
|
||||
let exporter_type = "jaeger";
|
||||
|
||||
let jaeger_addr = format!("{}:{}", jaeger_host, jaeger_port);
|
||||
|
||||
let process = opentelemetry_jaeger::Process {
|
||||
service_name: jaeger_service_name,
|
||||
tags: vec![Key::new("exporter").string(exporter_type)],
|
||||
};
|
||||
|
||||
let socket_addr: SocketAddr = match jaeger_addr.parse() {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("failed to parse Jaeger address: {:?}", e.to_string()),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let exporter = match opentelemetry_jaeger::Exporter::builder()
|
||||
.with_agent_endpoint(socket_addr.to_string())
|
||||
.with_process(process)
|
||||
.init()
|
||||
{
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("failed to create exporter: {:?}", e.to_string()),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(exporter)
|
||||
}
|
Reference in New Issue
Block a user