vsock-exporter: switch to tokio runtime

Make the vsock-exporter async totally using tokio runtime.
And delay the timing of the connection to trace-forwarder so that
it is easy to reconnect when the connection was broken.

Fixes: #2234

Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
Tim Zhang 2021-07-14 15:58:48 +08:00
parent 7960689ef7
commit 73d3798cb1
3 changed files with 53 additions and 47 deletions

3
src/agent/Cargo.lock generated
View File

@ -1798,7 +1798,8 @@ dependencies = [
"serde",
"slog",
"thiserror",
"vsock",
"tokio",
"tokio-vsock",
]
[[package]]

View File

@ -12,8 +12,9 @@ libc = "0.2.94"
thiserror = "1.0.24"
opentelemetry = { version = "0.14.0", features=["serialize"] }
serde = { version = "1.0.126", features = ["derive"] }
vsock = "0.2.3"
tokio-vsock = "0.3.1"
bincode = "1.3.3"
byteorder = "1.4.3"
slog = { version = "2.5.2", features = ["dynamic-keys", "max_level_trace", "release_max_level_info"] }
async-trait = "0.1.50"
tokio = "1.2.0"

View File

@ -16,11 +16,12 @@ use async_trait::async_trait;
use byteorder::{ByteOrder, NetworkEndian};
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use opentelemetry::sdk::export::ExportError;
use slog::{error, o, Logger};
use std::io::{ErrorKind, Write};
use std::net::Shutdown;
use std::sync::Mutex;
use vsock::{SockAddr, VsockStream};
use slog::{error, info, o, Logger};
use std::io::ErrorKind;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use tokio_vsock::VsockStream;
const ANY_CID: &str = "any";
@ -38,7 +39,7 @@ const DEFAULT_PORT: u32 = 10240;
pub struct Exporter {
port: u32,
cid: u32,
conn: Mutex<VsockStream>,
conn: Option<Arc<Mutex<VsockStream>>>,
logger: Logger,
}
@ -70,7 +71,12 @@ fn make_io_error(desc: String) -> std::io::Error {
}
// Send a trace span to the forwarder running on the host.
fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Error> {
async fn write_span(
writer: Arc<Mutex<VsockStream>>,
span: &SpanData,
) -> Result<(), std::io::Error> {
let mut writer = writer.lock().await;
let encoded_payload: Vec<u8> =
bincode::serialize(&span).map_err(|e| make_io_error(e.to_string()))?;
@ -83,18 +89,17 @@ fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Er
NetworkEndian::write_u64(&mut payload_len_as_bytes, payload_len);
// Send the header
writer
.write_all(&payload_len_as_bytes)
.map_err(|e| make_io_error(format!("failed to write trace header: {:?}", e)))?;
writer.write_all(&payload_len_as_bytes).await?;
writer
.write_all(&encoded_payload)
.map_err(|e| make_io_error(format!("failed to write trace payload: {:?}", e)))
writer.write_all(&encoded_payload).await
}
fn handle_batch(writer: &mut dyn Write, batch: Vec<SpanData>) -> ExportResult {
async fn handle_batch(
writer: Arc<Mutex<VsockStream>>,
batch: Vec<SpanData>,
) -> Result<(), std::io::Error> {
for span_data in batch {
write_span(writer, &span_data).map_err(Error::IOError)?;
write_span(writer.clone(), &span_data).await?;
}
Ok(())
@ -103,31 +108,32 @@ fn handle_batch(writer: &mut dyn Write, batch: Vec<SpanData>) -> ExportResult {
#[async_trait]
impl SpanExporter for Exporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
let conn = self.conn.lock();
if self.conn.is_none() {
let conn = connect_vsock(self.cid, self.port).await.map(|e| {
error!(self.logger, "failed to obtain connection"; "error" => format!("{:?}", e));
e
})?;
match conn {
Ok(mut c) => handle_batch(&mut *c, batch),
Err(e) => {
error!(self.logger, "failed to obtain connection";
"error" => format!("{}", e));
self.conn = Some(Arc::new(Mutex::new(conn)));
}
return Err(Error::ConnectionError(e.to_string()).into());
}
handle_batch(self.conn.as_ref().unwrap().clone(), batch)
.await
.map_err(|e| {
error!(self.logger, "handle_batch error: {:?}", e);
if e.kind() == ErrorKind::NotConnected {
info!(self.logger, "drop connection");
self.conn.take();
}
Error::IOError(e)
})?;
Ok(())
}
fn shutdown(&mut self) {
let conn = match self.conn.lock() {
Ok(conn) => conn,
Err(e) => {
error!(self.logger, "failed to obtain connection";
"error" => format!("{}", e));
return;
}
};
conn.shutdown(Shutdown::Write)
.expect("failed to shutdown VSOCK connection");
self.conn.take();
}
}
@ -169,8 +175,6 @@ impl Builder {
pub fn init(self) -> Exporter {
let Builder { port, cid, logger } = self;
let sock_addr = SockAddr::new_vsock(self.cid, self.port);
let cid_str: String;
if self.cid == libc::VMADDR_CID_ANY {
@ -179,18 +183,18 @@ impl Builder {
cid_str = format!("{}", self.cid);
}
let msg = format!(
"failed to connect to VSOCK server (port: {}, cid: {}) - {}",
self.port, cid_str, "ensure trace forwarder is running on host"
);
let conn = VsockStream::connect(&sock_addr).expect(&msg);
Exporter {
port,
cid,
conn: Mutex::new(conn),
conn: None,
logger: logger.new(o!("cid" => cid_str, "port" => port)),
}
}
}
async fn connect_vsock(cid: u32, port: u32) -> Result<VsockStream, Error> {
match VsockStream::connect(cid, port).await {
Ok(conn) => Ok(conn),
Err(e) => Err(Error::ConnectionError(e.to_string())),
}
}