mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 06:52:13 +00:00
vsock-exporter: switch to tokio runtime
Make the vsock-exporter async totally using tokio runtime. And delay the timing of the connection to trace-forwarder so that it is easy to reconnect when the connection was broken. Fixes: #2234 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
parent
7960689ef7
commit
73d3798cb1
3
src/agent/Cargo.lock
generated
3
src/agent/Cargo.lock
generated
@ -1798,7 +1798,8 @@ dependencies = [
|
||||
"serde",
|
||||
"slog",
|
||||
"thiserror",
|
||||
"vsock",
|
||||
"tokio",
|
||||
"tokio-vsock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -12,8 +12,9 @@ libc = "0.2.94"
|
||||
thiserror = "1.0.24"
|
||||
opentelemetry = { version = "0.14.0", features=["serialize"] }
|
||||
serde = { version = "1.0.126", features = ["derive"] }
|
||||
vsock = "0.2.3"
|
||||
tokio-vsock = "0.3.1"
|
||||
bincode = "1.3.3"
|
||||
byteorder = "1.4.3"
|
||||
slog = { version = "2.5.2", features = ["dynamic-keys", "max_level_trace", "release_max_level_info"] }
|
||||
async-trait = "0.1.50"
|
||||
tokio = "1.2.0"
|
||||
|
@ -16,11 +16,12 @@ use async_trait::async_trait;
|
||||
use byteorder::{ByteOrder, NetworkEndian};
|
||||
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
|
||||
use opentelemetry::sdk::export::ExportError;
|
||||
use slog::{error, o, Logger};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::net::Shutdown;
|
||||
use std::sync::Mutex;
|
||||
use vsock::{SockAddr, VsockStream};
|
||||
use slog::{error, info, o, Logger};
|
||||
use std::io::ErrorKind;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_vsock::VsockStream;
|
||||
|
||||
const ANY_CID: &str = "any";
|
||||
|
||||
@ -38,7 +39,7 @@ const DEFAULT_PORT: u32 = 10240;
|
||||
pub struct Exporter {
|
||||
port: u32,
|
||||
cid: u32,
|
||||
conn: Mutex<VsockStream>,
|
||||
conn: Option<Arc<Mutex<VsockStream>>>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
@ -70,7 +71,12 @@ fn make_io_error(desc: String) -> std::io::Error {
|
||||
}
|
||||
|
||||
// Send a trace span to the forwarder running on the host.
|
||||
fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Error> {
|
||||
async fn write_span(
|
||||
writer: Arc<Mutex<VsockStream>>,
|
||||
span: &SpanData,
|
||||
) -> Result<(), std::io::Error> {
|
||||
let mut writer = writer.lock().await;
|
||||
|
||||
let encoded_payload: Vec<u8> =
|
||||
bincode::serialize(&span).map_err(|e| make_io_error(e.to_string()))?;
|
||||
|
||||
@ -83,18 +89,17 @@ fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Er
|
||||
NetworkEndian::write_u64(&mut payload_len_as_bytes, payload_len);
|
||||
|
||||
// Send the header
|
||||
writer
|
||||
.write_all(&payload_len_as_bytes)
|
||||
.map_err(|e| make_io_error(format!("failed to write trace header: {:?}", e)))?;
|
||||
writer.write_all(&payload_len_as_bytes).await?;
|
||||
|
||||
writer
|
||||
.write_all(&encoded_payload)
|
||||
.map_err(|e| make_io_error(format!("failed to write trace payload: {:?}", e)))
|
||||
writer.write_all(&encoded_payload).await
|
||||
}
|
||||
|
||||
fn handle_batch(writer: &mut dyn Write, batch: Vec<SpanData>) -> ExportResult {
|
||||
async fn handle_batch(
|
||||
writer: Arc<Mutex<VsockStream>>,
|
||||
batch: Vec<SpanData>,
|
||||
) -> Result<(), std::io::Error> {
|
||||
for span_data in batch {
|
||||
write_span(writer, &span_data).map_err(Error::IOError)?;
|
||||
write_span(writer.clone(), &span_data).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -103,31 +108,32 @@ fn handle_batch(writer: &mut dyn Write, batch: Vec<SpanData>) -> ExportResult {
|
||||
#[async_trait]
|
||||
impl SpanExporter for Exporter {
|
||||
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
|
||||
let conn = self.conn.lock();
|
||||
if self.conn.is_none() {
|
||||
let conn = connect_vsock(self.cid, self.port).await.map(|e| {
|
||||
error!(self.logger, "failed to obtain connection"; "error" => format!("{:?}", e));
|
||||
e
|
||||
})?;
|
||||
|
||||
match conn {
|
||||
Ok(mut c) => handle_batch(&mut *c, batch),
|
||||
Err(e) => {
|
||||
error!(self.logger, "failed to obtain connection";
|
||||
"error" => format!("{}", e));
|
||||
|
||||
return Err(Error::ConnectionError(e.to_string()).into());
|
||||
}
|
||||
self.conn = Some(Arc::new(Mutex::new(conn)));
|
||||
}
|
||||
|
||||
handle_batch(self.conn.as_ref().unwrap().clone(), batch)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(self.logger, "handle_batch error: {:?}", e);
|
||||
if e.kind() == ErrorKind::NotConnected {
|
||||
info!(self.logger, "drop connection");
|
||||
self.conn.take();
|
||||
}
|
||||
|
||||
Error::IOError(e)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn shutdown(&mut self) {
|
||||
let conn = match self.conn.lock() {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
error!(self.logger, "failed to obtain connection";
|
||||
"error" => format!("{}", e));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
conn.shutdown(Shutdown::Write)
|
||||
.expect("failed to shutdown VSOCK connection");
|
||||
self.conn.take();
|
||||
}
|
||||
}
|
||||
|
||||
@ -169,8 +175,6 @@ impl Builder {
|
||||
pub fn init(self) -> Exporter {
|
||||
let Builder { port, cid, logger } = self;
|
||||
|
||||
let sock_addr = SockAddr::new_vsock(self.cid, self.port);
|
||||
|
||||
let cid_str: String;
|
||||
|
||||
if self.cid == libc::VMADDR_CID_ANY {
|
||||
@ -179,18 +183,18 @@ impl Builder {
|
||||
cid_str = format!("{}", self.cid);
|
||||
}
|
||||
|
||||
let msg = format!(
|
||||
"failed to connect to VSOCK server (port: {}, cid: {}) - {}",
|
||||
self.port, cid_str, "ensure trace forwarder is running on host"
|
||||
);
|
||||
|
||||
let conn = VsockStream::connect(&sock_addr).expect(&msg);
|
||||
|
||||
Exporter {
|
||||
port,
|
||||
cid,
|
||||
conn: Mutex::new(conn),
|
||||
conn: None,
|
||||
logger: logger.new(o!("cid" => cid_str, "port" => port)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_vsock(cid: u32, port: u32) -> Result<VsockStream, Error> {
|
||||
match VsockStream::connect(cid, port).await {
|
||||
Ok(conn) => Ok(conn),
|
||||
Err(e) => Err(Error::ConnectionError(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user