diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 16ddaca605..3a8387d47f 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -1798,7 +1798,8 @@ dependencies = [ "serde", "slog", "thiserror", - "vsock", + "tokio", + "tokio-vsock", ] [[package]] diff --git a/src/agent/vsock-exporter/Cargo.toml b/src/agent/vsock-exporter/Cargo.toml index acb4b5c2fa..e822ac695d 100644 --- a/src/agent/vsock-exporter/Cargo.toml +++ b/src/agent/vsock-exporter/Cargo.toml @@ -12,8 +12,9 @@ libc = "0.2.94" thiserror = "1.0.24" opentelemetry = { version = "0.14.0", features=["serialize"] } serde = { version = "1.0.126", features = ["derive"] } -vsock = "0.2.3" +tokio-vsock = "0.3.1" bincode = "1.3.3" byteorder = "1.4.3" slog = { version = "2.5.2", features = ["dynamic-keys", "max_level_trace", "release_max_level_info"] } async-trait = "0.1.50" +tokio = "1.2.0" diff --git a/src/agent/vsock-exporter/src/lib.rs b/src/agent/vsock-exporter/src/lib.rs index 99aaf341bb..e0e91c0cd9 100644 --- a/src/agent/vsock-exporter/src/lib.rs +++ b/src/agent/vsock-exporter/src/lib.rs @@ -16,11 +16,12 @@ use async_trait::async_trait; use byteorder::{ByteOrder, NetworkEndian}; use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use opentelemetry::sdk::export::ExportError; -use slog::{error, o, Logger}; -use std::io::{ErrorKind, Write}; -use std::net::Shutdown; -use std::sync::Mutex; -use vsock::{SockAddr, VsockStream}; +use slog::{error, info, o, Logger}; +use std::io::ErrorKind; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; +use tokio_vsock::VsockStream; const ANY_CID: &str = "any"; @@ -38,7 +39,7 @@ const DEFAULT_PORT: u32 = 10240; pub struct Exporter { port: u32, cid: u32, - conn: Mutex, + conn: Option>>, logger: Logger, } @@ -70,7 +71,12 @@ fn make_io_error(desc: String) -> std::io::Error { } // Send a trace span to the forwarder running on the host. -fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Error> { +async fn write_span( + writer: Arc>, + span: &SpanData, +) -> Result<(), std::io::Error> { + let mut writer = writer.lock().await; + let encoded_payload: Vec = bincode::serialize(&span).map_err(|e| make_io_error(e.to_string()))?; @@ -83,18 +89,17 @@ fn write_span(writer: &mut dyn Write, span: &SpanData) -> Result<(), std::io::Er NetworkEndian::write_u64(&mut payload_len_as_bytes, payload_len); // Send the header - writer - .write_all(&payload_len_as_bytes) - .map_err(|e| make_io_error(format!("failed to write trace header: {:?}", e)))?; + writer.write_all(&payload_len_as_bytes).await?; - writer - .write_all(&encoded_payload) - .map_err(|e| make_io_error(format!("failed to write trace payload: {:?}", e))) + writer.write_all(&encoded_payload).await } -fn handle_batch(writer: &mut dyn Write, batch: Vec) -> ExportResult { +async fn handle_batch( + writer: Arc>, + batch: Vec, +) -> Result<(), std::io::Error> { for span_data in batch { - write_span(writer, &span_data).map_err(Error::IOError)?; + write_span(writer.clone(), &span_data).await?; } Ok(()) @@ -103,31 +108,32 @@ fn handle_batch(writer: &mut dyn Write, batch: Vec) -> ExportResult { #[async_trait] impl SpanExporter for Exporter { async fn export(&mut self, batch: Vec) -> ExportResult { - let conn = self.conn.lock(); + if self.conn.is_none() { + let conn = connect_vsock(self.cid, self.port).await.map(|e| { + error!(self.logger, "failed to obtain connection"; "error" => format!("{:?}", e)); + e + })?; - match conn { - Ok(mut c) => handle_batch(&mut *c, batch), - Err(e) => { - error!(self.logger, "failed to obtain connection"; - "error" => format!("{}", e)); - - return Err(Error::ConnectionError(e.to_string()).into()); - } + self.conn = Some(Arc::new(Mutex::new(conn))); } + + handle_batch(self.conn.as_ref().unwrap().clone(), batch) + .await + .map_err(|e| { + error!(self.logger, "handle_batch error: {:?}", e); + if e.kind() == ErrorKind::NotConnected { + info!(self.logger, "drop connection"); + self.conn.take(); + } + + Error::IOError(e) + })?; + + Ok(()) } fn shutdown(&mut self) { - let conn = match self.conn.lock() { - Ok(conn) => conn, - Err(e) => { - error!(self.logger, "failed to obtain connection"; - "error" => format!("{}", e)); - return; - } - }; - - conn.shutdown(Shutdown::Write) - .expect("failed to shutdown VSOCK connection"); + self.conn.take(); } } @@ -169,8 +175,6 @@ impl Builder { pub fn init(self) -> Exporter { let Builder { port, cid, logger } = self; - let sock_addr = SockAddr::new_vsock(self.cid, self.port); - let cid_str: String; if self.cid == libc::VMADDR_CID_ANY { @@ -179,18 +183,18 @@ impl Builder { cid_str = format!("{}", self.cid); } - let msg = format!( - "failed to connect to VSOCK server (port: {}, cid: {}) - {}", - self.port, cid_str, "ensure trace forwarder is running on host" - ); - - let conn = VsockStream::connect(&sock_addr).expect(&msg); - Exporter { port, cid, - conn: Mutex::new(conn), + conn: None, logger: logger.new(o!("cid" => cid_str, "port" => port)), } } } + +async fn connect_vsock(cid: u32, port: u32) -> Result { + match VsockStream::connect(cid, port).await { + Ok(conn) => Ok(conn), + Err(e) => Err(Error::ConnectionError(e.to_string())), + } +}