diff --git a/src/runtime-rs/crates/agent/src/kata/mod.rs b/src/runtime-rs/crates/agent/src/kata/mod.rs index bfe3d0693..32350a8a9 100644 --- a/src/runtime-rs/crates/agent/src/kata/mod.rs +++ b/src/runtime-rs/crates/agent/src/kata/mod.rs @@ -111,6 +111,7 @@ impl KataAgent { ); let sock = sock::new(&inner.socket_address, inner.config.server_port).context("new sock")?; + info!(sl!(), "try to connect agent server through {:?}", sock); let stream = sock.connect(&config).await.context("connect")?; let fd = stream.into_raw_fd(); info!( diff --git a/src/runtime-rs/crates/agent/src/log_forwarder.rs b/src/runtime-rs/crates/agent/src/log_forwarder.rs index 221ddd6af..a51c9f222 100644 --- a/src/runtime-rs/crates/agent/src/log_forwarder.rs +++ b/src/runtime-rs/crates/agent/src/log_forwarder.rs @@ -44,42 +44,41 @@ impl LogForwarder { let logger = sl!().clone(); let address = address.to_string(); let task_handler = tokio::spawn(async move { - loop { - info!(logger, "try to connect to get agent log"); - let sock = match sock::new(&address, port) { - Ok(sock) => sock, - Err(err) => { - error!( - sl!(), - "failed to new sock for address {:?} port {} error {:?}", - address, - port, - err - ); - return; - } - }; + let sock = match sock::new(&address, port) { + Ok(sock) => sock, + Err(err) => { + error!( + sl!(), + "failed to new sock for address {:?} port {} error {:?}", + address, + port, + err + ); + return; + } + }; + info!(logger, "try to connect to agent-log"); - match sock.connect(&config).await { - Ok(stream) => { - let stream = BufReader::new(stream); - let mut lines = stream.lines(); - while let Ok(Some(l)) = lines.next_line().await { - match parse_agent_log_level(&l) { - LOG_LEVEL_TRACE => trace!(sl!(), "{}", l), - LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l), - LOG_LEVEL_WARNING => warn!(sl!(), "{}", l), - LOG_LEVEL_ERROR => error!(sl!(), "{}", l), - LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l), - _ => info!(sl!(), "{}", l), - } + match sock.connect(&config).await { + Ok(stream) => { + info!(logger, "connected to agent-log successfully"); + let stream = BufReader::new(stream); + let mut lines = stream.lines(); + while let Ok(Some(l)) = lines.next_line().await { + match parse_agent_log_level(&l) { + LOG_LEVEL_TRACE => trace!(sl!(), "{}", l), + LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l), + LOG_LEVEL_WARNING => warn!(sl!(), "{}", l), + LOG_LEVEL_ERROR => error!(sl!(), "{}", l), + LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l), + _ => info!(sl!(), "{}", l), } } - Err(err) => { - warn!(logger, "connect agent vsock failed: {:?}", err); - } } - } + Err(err) => { + warn!(logger, "failed to connect agent-log, err: {:?}", err); + } + }; }); self.task_handler = Some(task_handler); Ok(()) diff --git a/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs b/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs index 1b19a65b0..03c89cd89 100644 --- a/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs +++ b/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs @@ -4,8 +4,6 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::os::unix::prelude::AsRawFd; - use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use tokio::{ @@ -33,32 +31,43 @@ impl HybridVsock { #[async_trait] impl Sock for HybridVsock { async fn connect(&self, config: &ConnectConfig) -> Result { + let mut last_err = None; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; + for i in 0..retry_times { match connect_helper(&self.uds, self.port).await { Ok(stream) => { - info!( - sl!(), - "connect success on {} current client fd {}", - i, - stream.as_raw_fd() - ); + info!(sl!(), "hybrid vsock: connected to {:?}", self); return Ok(Stream::Unix(stream)); } Err(err) => { - debug!(sl!(), "connect on {} err : {:?}", i, err); + trace!( + sl!(), + "hybrid vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms", + self, + err, + i, + config.dial_timeout_ms, + ); + last_err = Some(err); tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms)) .await; continue; } } } - Err(anyhow!("cannot connect to agent ttrpc server {:?}", config)) + + // Safe to unwrap the last_err, as this line will be unreachable if + // no errors occurred. + Err(anyhow!( + "hybrid vsock: failed to connect to {:?}, err {:?}", + self, + last_err.unwrap() + )) } } async fn connect_helper(uds: &str, port: u32) -> Result { - info!(sl!(), "connect uds {:?} port {}", &uds, port); let mut stream = UnixStream::connect(&uds).await.context("connect")?; stream .write_all(format!("connect {}\n", port).as_bytes()) diff --git a/src/runtime-rs/crates/agent/src/sock/mod.rs b/src/runtime-rs/crates/agent/src/sock/mod.rs index 21809253b..a82da7907 100644 --- a/src/runtime-rs/crates/agent/src/sock/mod.rs +++ b/src/runtime-rs/crates/agent/src/sock/mod.rs @@ -105,7 +105,7 @@ enum SockType { } #[async_trait] -pub trait Sock: Send + Sync { +pub trait Sock: Send + Sync + std::fmt::Debug { async fn connect(&self, config: &ConnectConfig) -> Result; } diff --git a/src/runtime-rs/crates/agent/src/sock/remote.rs b/src/runtime-rs/crates/agent/src/sock/remote.rs index c22d9ccbc..be788dccb 100644 --- a/src/runtime-rs/crates/agent/src/sock/remote.rs +++ b/src/runtime-rs/crates/agent/src/sock/remote.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{os::unix::prelude::AsRawFd, path::Path}; +use std::path::Path; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; @@ -26,27 +26,39 @@ impl Remote { #[async_trait] impl Sock for Remote { async fn connect(&self, config: &ConnectConfig) -> Result { + let mut last_err = None; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; + for i in 0..retry_times { match connect_helper(&self.path).await { Ok(stream) => { - info!( - sl!(), - "remote connect success on {} current client fd {}", - i, - stream.as_raw_fd() - ); + info!(sl!(), "remote sock: connected to {:?}", self); return Ok(Stream::Unix(stream)); } Err(err) => { - debug!(sl!(), "remote connect on {} err : {:?}", i, err); + trace!( + sl!(), + "remote sock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms", + self, + err, + i, + config.dial_timeout_ms + ); + last_err = Some(err); tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms)) .await; continue; } } } - Err(anyhow!("cannot connect to agent ttrpc server {:?}", config)) + + // Safe to unwrap the last_err, as this line will be unreachable if + // no errors occurred. + Err(anyhow!( + "remote sock: failed to connect to {:?}, err {:?}", + self, + last_err.unwrap() + )) } } diff --git a/src/runtime-rs/crates/agent/src/sock/vsock.rs b/src/runtime-rs/crates/agent/src/sock/vsock.rs index 604b0d4ca..2612d8133 100644 --- a/src/runtime-rs/crates/agent/src/sock/vsock.rs +++ b/src/runtime-rs/crates/agent/src/sock/vsock.rs @@ -31,6 +31,7 @@ impl Vsock { #[async_trait] impl Sock for Vsock { async fn connect(&self, config: &ConnectConfig) -> Result { + let mut last_err = None; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; let sock_addr = VsockAddr::new(self.vsock_cid, self.port); let connect_once = || { @@ -61,23 +62,30 @@ impl Sock for Vsock { for i in 0..retry_times { match connect_once() { Ok(stream) => { - info!( - sl!(), - "connect vsock success on {} current client fd {}", - i, - stream.as_raw_fd() - ); + info!(sl!(), "vsock: connected to {:?}", self); return Ok(Stream::Vsock(stream)); } Err(e) => { - debug!(sl!(), "retry after {} ms: failed to connect to agent via vsock at {} attempts: {:?}", config.dial_timeout_ms, i, e); + trace!( + sl!(), + "vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms", + self, + e, + i, + config.dial_timeout_ms, + ); + last_err = Some(e); tokio::time::sleep(Duration::from_millis(config.dial_timeout_ms)).await; } } } + + // Safe to unwrap the last_err, as this line will be unreachable if + // no errors occurred. Err(anyhow!( - "cannot connect vsock to agent ttrpc server {:?}", - config + "vsock: failed to connect to {:?}, err {:?}", + self, + last_err.unwrap() )) } }