From 77ca2fe88b8860f222dea122249c2a188bd22d27 Mon Sep 17 00:00:00 2001 From: Xuewei Niu Date: Fri, 7 Feb 2025 11:07:20 +0800 Subject: [PATCH] runtime-rs: Reduce the number of duplicate log entries being printed When connecting to guest through vsock, a log is printed for each failure. The failure comes from two main reasons: (1) the guest is not ready or (2) some real errors happen. Printing logs for the first case leads to log clutter, and your logs will like this: ``` Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/... Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/... Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/... Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/... Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/... ``` To avoid this, the sock implmentations save the last error and return it after all retries are exhausted. Users are able to check all errors by setting the log level to trace. Reorganize the log format to "{sock type}: {message}" to make it clearer. Apart from that, errors return by the socks use `self`, instead of `ConnectConfig`, since the `ConnectConfig` doesn't provide any useful information. Disable infinite loop for the log forwarder. There is retry logic in the sock implmentations. We can consider the agent-log unavailable if `sock.connect()` encounters an error. Fixes: #10847 Signed-off-by: Xuewei Niu --- src/runtime-rs/crates/agent/src/kata/mod.rs | 1 + .../crates/agent/src/log_forwarder.rs | 63 +++++++++---------- .../crates/agent/src/sock/hybrid_vsock.rs | 31 +++++---- src/runtime-rs/crates/agent/src/sock/mod.rs | 2 +- .../crates/agent/src/sock/remote.rs | 30 ++++++--- src/runtime-rs/crates/agent/src/sock/vsock.rs | 26 +++++--- 6 files changed, 91 insertions(+), 62 deletions(-) diff --git a/src/runtime-rs/crates/agent/src/kata/mod.rs b/src/runtime-rs/crates/agent/src/kata/mod.rs index bfe3d0693..32350a8a9 100644 --- a/src/runtime-rs/crates/agent/src/kata/mod.rs +++ b/src/runtime-rs/crates/agent/src/kata/mod.rs @@ -111,6 +111,7 @@ impl KataAgent { ); let sock = sock::new(&inner.socket_address, inner.config.server_port).context("new sock")?; + info!(sl!(), "try to connect agent server through {:?}", sock); let stream = sock.connect(&config).await.context("connect")?; let fd = stream.into_raw_fd(); info!( diff --git a/src/runtime-rs/crates/agent/src/log_forwarder.rs b/src/runtime-rs/crates/agent/src/log_forwarder.rs index 221ddd6af..a51c9f222 100644 --- a/src/runtime-rs/crates/agent/src/log_forwarder.rs +++ b/src/runtime-rs/crates/agent/src/log_forwarder.rs @@ -44,42 +44,41 @@ impl LogForwarder { let logger = sl!().clone(); let address = address.to_string(); let task_handler = tokio::spawn(async move { - loop { - info!(logger, "try to connect to get agent log"); - let sock = match sock::new(&address, port) { - Ok(sock) => sock, - Err(err) => { - error!( - sl!(), - "failed to new sock for address {:?} port {} error {:?}", - address, - port, - err - ); - return; - } - }; + let sock = match sock::new(&address, port) { + Ok(sock) => sock, + Err(err) => { + error!( + sl!(), + "failed to new sock for address {:?} port {} error {:?}", + address, + port, + err + ); + return; + } + }; + info!(logger, "try to connect to agent-log"); - match sock.connect(&config).await { - Ok(stream) => { - let stream = BufReader::new(stream); - let mut lines = stream.lines(); - while let Ok(Some(l)) = lines.next_line().await { - match parse_agent_log_level(&l) { - LOG_LEVEL_TRACE => trace!(sl!(), "{}", l), - LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l), - LOG_LEVEL_WARNING => warn!(sl!(), "{}", l), - LOG_LEVEL_ERROR => error!(sl!(), "{}", l), - LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l), - _ => info!(sl!(), "{}", l), - } + match sock.connect(&config).await { + Ok(stream) => { + info!(logger, "connected to agent-log successfully"); + let stream = BufReader::new(stream); + let mut lines = stream.lines(); + while let Ok(Some(l)) = lines.next_line().await { + match parse_agent_log_level(&l) { + LOG_LEVEL_TRACE => trace!(sl!(), "{}", l), + LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l), + LOG_LEVEL_WARNING => warn!(sl!(), "{}", l), + LOG_LEVEL_ERROR => error!(sl!(), "{}", l), + LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l), + _ => info!(sl!(), "{}", l), } } - Err(err) => { - warn!(logger, "connect agent vsock failed: {:?}", err); - } } - } + Err(err) => { + warn!(logger, "failed to connect agent-log, err: {:?}", err); + } + }; }); self.task_handler = Some(task_handler); Ok(()) diff --git a/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs b/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs index 1b19a65b0..03c89cd89 100644 --- a/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs +++ b/src/runtime-rs/crates/agent/src/sock/hybrid_vsock.rs @@ -4,8 +4,6 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::os::unix::prelude::AsRawFd; - use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use tokio::{ @@ -33,32 +31,43 @@ impl HybridVsock { #[async_trait] impl Sock for HybridVsock { async fn connect(&self, config: &ConnectConfig) -> Result { + let mut last_err = None; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; + for i in 0..retry_times { match connect_helper(&self.uds, self.port).await { Ok(stream) => { - info!( - sl!(), - "connect success on {} current client fd {}", - i, - stream.as_raw_fd() - ); + info!(sl!(), "hybrid vsock: connected to {:?}", self); return Ok(Stream::Unix(stream)); } Err(err) => { - debug!(sl!(), "connect on {} err : {:?}", i, err); + trace!( + sl!(), + "hybrid vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms", + self, + err, + i, + config.dial_timeout_ms, + ); + last_err = Some(err); tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms)) .await; continue; } } } - Err(anyhow!("cannot connect to agent ttrpc server {:?}", config)) + + // Safe to unwrap the last_err, as this line will be unreachable if + // no errors occurred. + Err(anyhow!( + "hybrid vsock: failed to connect to {:?}, err {:?}", + self, + last_err.unwrap() + )) } } async fn connect_helper(uds: &str, port: u32) -> Result { - info!(sl!(), "connect uds {:?} port {}", &uds, port); let mut stream = UnixStream::connect(&uds).await.context("connect")?; stream .write_all(format!("connect {}\n", port).as_bytes()) diff --git a/src/runtime-rs/crates/agent/src/sock/mod.rs b/src/runtime-rs/crates/agent/src/sock/mod.rs index 21809253b..a82da7907 100644 --- a/src/runtime-rs/crates/agent/src/sock/mod.rs +++ b/src/runtime-rs/crates/agent/src/sock/mod.rs @@ -105,7 +105,7 @@ enum SockType { } #[async_trait] -pub trait Sock: Send + Sync { +pub trait Sock: Send + Sync + std::fmt::Debug { async fn connect(&self, config: &ConnectConfig) -> Result; } diff --git a/src/runtime-rs/crates/agent/src/sock/remote.rs b/src/runtime-rs/crates/agent/src/sock/remote.rs index c22d9ccbc..be788dccb 100644 --- a/src/runtime-rs/crates/agent/src/sock/remote.rs +++ b/src/runtime-rs/crates/agent/src/sock/remote.rs @@ -4,7 +4,7 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{os::unix::prelude::AsRawFd, path::Path}; +use std::path::Path; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; @@ -26,27 +26,39 @@ impl Remote { #[async_trait] impl Sock for Remote { async fn connect(&self, config: &ConnectConfig) -> Result { + let mut last_err = None; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; + for i in 0..retry_times { match connect_helper(&self.path).await { Ok(stream) => { - info!( - sl!(), - "remote connect success on {} current client fd {}", - i, - stream.as_raw_fd() - ); + info!(sl!(), "remote sock: connected to {:?}", self); return Ok(Stream::Unix(stream)); } Err(err) => { - debug!(sl!(), "remote connect on {} err : {:?}", i, err); + trace!( + sl!(), + "remote sock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms", + self, + err, + i, + config.dial_timeout_ms + ); + last_err = Some(err); tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms)) .await; continue; } } } - Err(anyhow!("cannot connect to agent ttrpc server {:?}", config)) + + // Safe to unwrap the last_err, as this line will be unreachable if + // no errors occurred. + Err(anyhow!( + "remote sock: failed to connect to {:?}, err {:?}", + self, + last_err.unwrap() + )) } } diff --git a/src/runtime-rs/crates/agent/src/sock/vsock.rs b/src/runtime-rs/crates/agent/src/sock/vsock.rs index 604b0d4ca..2612d8133 100644 --- a/src/runtime-rs/crates/agent/src/sock/vsock.rs +++ b/src/runtime-rs/crates/agent/src/sock/vsock.rs @@ -31,6 +31,7 @@ impl Vsock { #[async_trait] impl Sock for Vsock { async fn connect(&self, config: &ConnectConfig) -> Result { + let mut last_err = None; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; let sock_addr = VsockAddr::new(self.vsock_cid, self.port); let connect_once = || { @@ -61,23 +62,30 @@ impl Sock for Vsock { for i in 0..retry_times { match connect_once() { Ok(stream) => { - info!( - sl!(), - "connect vsock success on {} current client fd {}", - i, - stream.as_raw_fd() - ); + info!(sl!(), "vsock: connected to {:?}", self); return Ok(Stream::Vsock(stream)); } Err(e) => { - debug!(sl!(), "retry after {} ms: failed to connect to agent via vsock at {} attempts: {:?}", config.dial_timeout_ms, i, e); + trace!( + sl!(), + "vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms", + self, + e, + i, + config.dial_timeout_ms, + ); + last_err = Some(e); tokio::time::sleep(Duration::from_millis(config.dial_timeout_ms)).await; } } } + + // Safe to unwrap the last_err, as this line will be unreachable if + // no errors occurred. Err(anyhow!( - "cannot connect vsock to agent ttrpc server {:?}", - config + "vsock: failed to connect to {:?}, err {:?}", + self, + last_err.unwrap() )) } }