runtime-rs: Reduce the number of duplicate log entries being printed

When connecting to guest through vsock, a log is printed for each failure.
The failure comes from two main reasons: (1) the guest is not ready or (2)
some real errors happen. Printing logs for the first case leads to log
clutter, and your logs will like this:

```
Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/...
Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/...
Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/...
Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/...
Feb 07 02:47:24 ubuntu containerd[520]: {"msg":"connect uds \"/run/kata/...
```

To avoid this, the sock implmentations save the last error and return it
after all retries are exhausted. Users are able to check all errors by
setting the log level to trace.

Reorganize the log format to "{sock type}: {message}" to make it clearer.
Apart from that, errors return by the socks use `self`, instead of
`ConnectConfig`, since the `ConnectConfig` doesn't provide any useful
information.

Disable infinite loop for the log forwarder. There is retry logic in the
sock implmentations. We can consider the agent-log unavailable if
`sock.connect()` encounters an error.

Fixes: #10847

Signed-off-by: Xuewei Niu <niuxuewei.nxw@antgroup.com>
This commit is contained in:
Xuewei Niu 2025-02-07 11:07:20 +08:00
parent 8c3f8f8e21
commit 77ca2fe88b
6 changed files with 91 additions and 62 deletions

View File

@ -111,6 +111,7 @@ impl KataAgent {
);
let sock =
sock::new(&inner.socket_address, inner.config.server_port).context("new sock")?;
info!(sl!(), "try to connect agent server through {:?}", sock);
let stream = sock.connect(&config).await.context("connect")?;
let fd = stream.into_raw_fd();
info!(

View File

@ -44,42 +44,41 @@ impl LogForwarder {
let logger = sl!().clone();
let address = address.to_string();
let task_handler = tokio::spawn(async move {
loop {
info!(logger, "try to connect to get agent log");
let sock = match sock::new(&address, port) {
Ok(sock) => sock,
Err(err) => {
error!(
sl!(),
"failed to new sock for address {:?} port {} error {:?}",
address,
port,
err
);
return;
}
};
let sock = match sock::new(&address, port) {
Ok(sock) => sock,
Err(err) => {
error!(
sl!(),
"failed to new sock for address {:?} port {} error {:?}",
address,
port,
err
);
return;
}
};
info!(logger, "try to connect to agent-log");
match sock.connect(&config).await {
Ok(stream) => {
let stream = BufReader::new(stream);
let mut lines = stream.lines();
while let Ok(Some(l)) = lines.next_line().await {
match parse_agent_log_level(&l) {
LOG_LEVEL_TRACE => trace!(sl!(), "{}", l),
LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l),
LOG_LEVEL_WARNING => warn!(sl!(), "{}", l),
LOG_LEVEL_ERROR => error!(sl!(), "{}", l),
LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l),
_ => info!(sl!(), "{}", l),
}
match sock.connect(&config).await {
Ok(stream) => {
info!(logger, "connected to agent-log successfully");
let stream = BufReader::new(stream);
let mut lines = stream.lines();
while let Ok(Some(l)) = lines.next_line().await {
match parse_agent_log_level(&l) {
LOG_LEVEL_TRACE => trace!(sl!(), "{}", l),
LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l),
LOG_LEVEL_WARNING => warn!(sl!(), "{}", l),
LOG_LEVEL_ERROR => error!(sl!(), "{}", l),
LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l),
_ => info!(sl!(), "{}", l),
}
}
Err(err) => {
warn!(logger, "connect agent vsock failed: {:?}", err);
}
}
}
Err(err) => {
warn!(logger, "failed to connect agent-log, err: {:?}", err);
}
};
});
self.task_handler = Some(task_handler);
Ok(())

View File

@ -4,8 +4,6 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::os::unix::prelude::AsRawFd;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use tokio::{
@ -33,32 +31,43 @@ impl HybridVsock {
#[async_trait]
impl Sock for HybridVsock {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let mut last_err = None;
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
for i in 0..retry_times {
match connect_helper(&self.uds, self.port).await {
Ok(stream) => {
info!(
sl!(),
"connect success on {} current client fd {}",
i,
stream.as_raw_fd()
);
info!(sl!(), "hybrid vsock: connected to {:?}", self);
return Ok(Stream::Unix(stream));
}
Err(err) => {
debug!(sl!(), "connect on {} err : {:?}", i, err);
trace!(
sl!(),
"hybrid vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms",
self,
err,
i,
config.dial_timeout_ms,
);
last_err = Some(err);
tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms))
.await;
continue;
}
}
}
Err(anyhow!("cannot connect to agent ttrpc server {:?}", config))
// Safe to unwrap the last_err, as this line will be unreachable if
// no errors occurred.
Err(anyhow!(
"hybrid vsock: failed to connect to {:?}, err {:?}",
self,
last_err.unwrap()
))
}
}
async fn connect_helper(uds: &str, port: u32) -> Result<UnixStream> {
info!(sl!(), "connect uds {:?} port {}", &uds, port);
let mut stream = UnixStream::connect(&uds).await.context("connect")?;
stream
.write_all(format!("connect {}\n", port).as_bytes())

View File

@ -105,7 +105,7 @@ enum SockType {
}
#[async_trait]
pub trait Sock: Send + Sync {
pub trait Sock: Send + Sync + std::fmt::Debug {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream>;
}

View File

@ -4,7 +4,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::{os::unix::prelude::AsRawFd, path::Path};
use std::path::Path;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
@ -26,27 +26,39 @@ impl Remote {
#[async_trait]
impl Sock for Remote {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let mut last_err = None;
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
for i in 0..retry_times {
match connect_helper(&self.path).await {
Ok(stream) => {
info!(
sl!(),
"remote connect success on {} current client fd {}",
i,
stream.as_raw_fd()
);
info!(sl!(), "remote sock: connected to {:?}", self);
return Ok(Stream::Unix(stream));
}
Err(err) => {
debug!(sl!(), "remote connect on {} err : {:?}", i, err);
trace!(
sl!(),
"remote sock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms",
self,
err,
i,
config.dial_timeout_ms
);
last_err = Some(err);
tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms))
.await;
continue;
}
}
}
Err(anyhow!("cannot connect to agent ttrpc server {:?}", config))
// Safe to unwrap the last_err, as this line will be unreachable if
// no errors occurred.
Err(anyhow!(
"remote sock: failed to connect to {:?}, err {:?}",
self,
last_err.unwrap()
))
}
}

View File

@ -31,6 +31,7 @@ impl Vsock {
#[async_trait]
impl Sock for Vsock {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let mut last_err = None;
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
let sock_addr = VsockAddr::new(self.vsock_cid, self.port);
let connect_once = || {
@ -61,23 +62,30 @@ impl Sock for Vsock {
for i in 0..retry_times {
match connect_once() {
Ok(stream) => {
info!(
sl!(),
"connect vsock success on {} current client fd {}",
i,
stream.as_raw_fd()
);
info!(sl!(), "vsock: connected to {:?}", self);
return Ok(Stream::Vsock(stream));
}
Err(e) => {
debug!(sl!(), "retry after {} ms: failed to connect to agent via vsock at {} attempts: {:?}", config.dial_timeout_ms, i, e);
trace!(
sl!(),
"vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms",
self,
e,
i,
config.dial_timeout_ms,
);
last_err = Some(e);
tokio::time::sleep(Duration::from_millis(config.dial_timeout_ms)).await;
}
}
}
// Safe to unwrap the last_err, as this line will be unreachable if
// no errors occurred.
Err(anyhow!(
"cannot connect vsock to agent ttrpc server {:?}",
config
"vsock: failed to connect to {:?}, err {:?}",
self,
last_err.unwrap()
))
}
}