mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 15:02:45 +00:00
vsock-exporter: switch to tokio runtime
Make the vsock-exporter async totally using tokio runtime. And delay the timing of the connection to trace-forwarder so that it is easy to reconnect when the connection was broken. Fixes: #2234 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
parent
7960689ef7
commit
73d3798cb1
3
src/agent/Cargo.lock
generated
3
src/agent/Cargo.lock
generated
@ -1798,7 +1798,8 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
"slog",
|
"slog",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"vsock",
|
"tokio",
|
||||||
|
"tokio-vsock",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -12,8 +12,9 @@ libc = "0.2.94"
|
|||||||
thiserror = "1.0.24"
|
thiserror = "1.0.24"
|
||||||
opentelemetry = { version = "0.14.0", features=["serialize"] }
|
opentelemetry = { version = "0.14.0", features=["serialize"] }
|
||||||
serde = { version = "1.0.126", features = ["derive"] }
|
serde = { version = "1.0.126", features = ["derive"] }
|
||||||
vsock = "0.2.3"
|
tokio-vsock = "0.3.1"
|
||||||
bincode = "1.3.3"
|
bincode = "1.3.3"
|
||||||
byteorder = "1.4.3"
|
byteorder = "1.4.3"
|
||||||
slog = { version = "2.5.2", features = ["dynamic-keys", "max_level_trace", "release_max_level_info"] }
|
slog = { version = "2.5.2", features = ["dynamic-keys", "max_level_trace", "release_max_level_info"] }
|
||||||
async-trait = "0.1.50"
|
async-trait = "0.1.50"
|
||||||
|
tokio = "1.2.0"
|
||||||
|
@ -16,11 +16,12 @@ use async_trait::async_trait;
|
|||||||
use byteorder::{ByteOrder, NetworkEndian};
|
use byteorder::{ByteOrder, NetworkEndian};
|
||||||
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
|
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
|
||||||
use opentelemetry::sdk::export::ExportError;
|
use opentelemetry::sdk::export::ExportError;
|
||||||
use slog::{error, o, Logger};
|
use slog::{error, info, o, Logger};
|
||||||
use std::io::{ErrorKind, Write};
|
use std::io::ErrorKind;
|
||||||
use std::net::Shutdown;
|
use std::sync::Arc;
|
||||||
use std::sync::Mutex;
|
use tokio::io::AsyncWriteExt;
|
||||||
use vsock::{SockAddr, VsockStream};
|
use tokio::sync::Mutex;
|
||||||
|
use tokio_vsock::VsockStream;
|
||||||
|
|
||||||
const ANY_CID: &str = "any";
|
const ANY_CID: &str = "any";
|
||||||
|
|
||||||
@ -38,7 +39,7 @@ const DEFAULT_PORT: u32 = 10240;
|
|||||||
pub struct Exporter {
|
pub struct Exporter {
|
||||||
port: u32,
|
port: u32,
|
||||||
cid: u32,
|
cid: u32,
|
||||||
conn: Mutex<VsockStream>,
|
conn: Option<Arc<Mutex<VsockStream>>>,
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,7 +71,12 @@ fn make_io_error(desc: String) -> std::io::Error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send a trace span to the forwarder running on the host.
|
// Send a trace span to the forwarder running on the host.
|
||||||
fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Error> {
|
async fn write_span(
|
||||||
|
writer: Arc<Mutex<VsockStream>>,
|
||||||
|
span: &SpanData,
|
||||||
|
) -> Result<(), std::io::Error> {
|
||||||
|
let mut writer = writer.lock().await;
|
||||||
|
|
||||||
let encoded_payload: Vec<u8> =
|
let encoded_payload: Vec<u8> =
|
||||||
bincode::serialize(&span).map_err(|e| make_io_error(e.to_string()))?;
|
bincode::serialize(&span).map_err(|e| make_io_error(e.to_string()))?;
|
||||||
|
|
||||||
@ -83,18 +89,17 @@ fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Er
|
|||||||
NetworkEndian::write_u64(&mut payload_len_as_bytes, payload_len);
|
NetworkEndian::write_u64(&mut payload_len_as_bytes, payload_len);
|
||||||
|
|
||||||
// Send the header
|
// Send the header
|
||||||
writer
|
writer.write_all(&payload_len_as_bytes).await?;
|
||||||
.write_all(&payload_len_as_bytes)
|
|
||||||
.map_err(|e| make_io_error(format!("failed to write trace header: {:?}", e)))?;
|
|
||||||
|
|
||||||
writer
|
writer.write_all(&encoded_payload).await
|
||||||
.write_all(&encoded_payload)
|
|
||||||
.map_err(|e| make_io_error(format!("failed to write trace payload: {:?}", e)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_batch(writer: &mut dyn Write, batch: Vec<SpanData>) -> ExportResult {
|
async fn handle_batch(
|
||||||
|
writer: Arc<Mutex<VsockStream>>,
|
||||||
|
batch: Vec<SpanData>,
|
||||||
|
) -> Result<(), std::io::Error> {
|
||||||
for span_data in batch {
|
for span_data in batch {
|
||||||
write_span(writer, &span_data).map_err(Error::IOError)?;
|
write_span(writer.clone(), &span_data).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -103,31 +108,32 @@ fn handle_batch(writer: &mut dyn Write, batch: Vec<SpanData>) -> ExportResult {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl SpanExporter for Exporter {
|
impl SpanExporter for Exporter {
|
||||||
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
|
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
|
||||||
let conn = self.conn.lock();
|
if self.conn.is_none() {
|
||||||
|
let conn = connect_vsock(self.cid, self.port).await.map(|e| {
|
||||||
|
error!(self.logger, "failed to obtain connection"; "error" => format!("{:?}", e));
|
||||||
|
e
|
||||||
|
})?;
|
||||||
|
|
||||||
match conn {
|
self.conn = Some(Arc::new(Mutex::new(conn)));
|
||||||
Ok(mut c) => handle_batch(&mut *c, batch),
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.logger, "failed to obtain connection";
|
|
||||||
"error" => format!("{}", e));
|
|
||||||
|
|
||||||
return Err(Error::ConnectionError(e.to_string()).into());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handle_batch(self.conn.as_ref().unwrap().clone(), batch)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
error!(self.logger, "handle_batch error: {:?}", e);
|
||||||
|
if e.kind() == ErrorKind::NotConnected {
|
||||||
|
info!(self.logger, "drop connection");
|
||||||
|
self.conn.take();
|
||||||
|
}
|
||||||
|
|
||||||
|
Error::IOError(e)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&mut self) {
|
fn shutdown(&mut self) {
|
||||||
let conn = match self.conn.lock() {
|
self.conn.take();
|
||||||
Ok(conn) => conn,
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.logger, "failed to obtain connection";
|
|
||||||
"error" => format!("{}", e));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
conn.shutdown(Shutdown::Write)
|
|
||||||
.expect("failed to shutdown VSOCK connection");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,8 +175,6 @@ impl Builder {
|
|||||||
pub fn init(self) -> Exporter {
|
pub fn init(self) -> Exporter {
|
||||||
let Builder { port, cid, logger } = self;
|
let Builder { port, cid, logger } = self;
|
||||||
|
|
||||||
let sock_addr = SockAddr::new_vsock(self.cid, self.port);
|
|
||||||
|
|
||||||
let cid_str: String;
|
let cid_str: String;
|
||||||
|
|
||||||
if self.cid == libc::VMADDR_CID_ANY {
|
if self.cid == libc::VMADDR_CID_ANY {
|
||||||
@ -179,18 +183,18 @@ impl Builder {
|
|||||||
cid_str = format!("{}", self.cid);
|
cid_str = format!("{}", self.cid);
|
||||||
}
|
}
|
||||||
|
|
||||||
let msg = format!(
|
|
||||||
"failed to connect to VSOCK server (port: {}, cid: {}) - {}",
|
|
||||||
self.port, cid_str, "ensure trace forwarder is running on host"
|
|
||||||
);
|
|
||||||
|
|
||||||
let conn = VsockStream::connect(&sock_addr).expect(&msg);
|
|
||||||
|
|
||||||
Exporter {
|
Exporter {
|
||||||
port,
|
port,
|
||||||
cid,
|
cid,
|
||||||
conn: Mutex::new(conn),
|
conn: None,
|
||||||
logger: logger.new(o!("cid" => cid_str, "port" => port)),
|
logger: logger.new(o!("cid" => cid_str, "port" => port)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn connect_vsock(cid: u32, port: u32) -> Result<VsockStream, Error> {
|
||||||
|
match VsockStream::connect(cid, port).await {
|
||||||
|
Ok(conn) => Ok(conn),
|
||||||
|
Err(e) => Err(Error::ConnectionError(e.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user