Merge pull request #11377 from justxuewei/hvsock-logging

This commit is contained in:
Xuewei Niu 2025-06-10 16:45:59 +08:00 committed by GitHub
commit ac6779428f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 91 additions and 62 deletions

View File

@ -111,6 +111,7 @@ impl KataAgent {
); );
let sock = let sock =
sock::new(&inner.socket_address, inner.config.server_port).context("new sock")?; sock::new(&inner.socket_address, inner.config.server_port).context("new sock")?;
info!(sl!(), "try to connect agent server through {:?}", sock);
let stream = sock.connect(&config).await.context("connect")?; let stream = sock.connect(&config).await.context("connect")?;
let fd = stream.into_raw_fd(); let fd = stream.into_raw_fd();
info!( info!(

View File

@ -44,42 +44,41 @@ impl LogForwarder {
let logger = sl!().clone(); let logger = sl!().clone();
let address = address.to_string(); let address = address.to_string();
let task_handler = tokio::spawn(async move { let task_handler = tokio::spawn(async move {
loop { let sock = match sock::new(&address, port) {
info!(logger, "try to connect to get agent log"); Ok(sock) => sock,
let sock = match sock::new(&address, port) { Err(err) => {
Ok(sock) => sock, error!(
Err(err) => { sl!(),
error!( "failed to new sock for address {:?} port {} error {:?}",
sl!(), address,
"failed to new sock for address {:?} port {} error {:?}", port,
address, err
port, );
err return;
); }
return; };
} info!(logger, "try to connect to agent-log");
};
match sock.connect(&config).await { match sock.connect(&config).await {
Ok(stream) => { Ok(stream) => {
let stream = BufReader::new(stream); info!(logger, "connected to agent-log successfully");
let mut lines = stream.lines(); let stream = BufReader::new(stream);
while let Ok(Some(l)) = lines.next_line().await { let mut lines = stream.lines();
match parse_agent_log_level(&l) { while let Ok(Some(l)) = lines.next_line().await {
LOG_LEVEL_TRACE => trace!(sl!(), "{}", l), match parse_agent_log_level(&l) {
LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l), LOG_LEVEL_TRACE => trace!(sl!(), "{}", l),
LOG_LEVEL_WARNING => warn!(sl!(), "{}", l), LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l),
LOG_LEVEL_ERROR => error!(sl!(), "{}", l), LOG_LEVEL_WARNING => warn!(sl!(), "{}", l),
LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l), LOG_LEVEL_ERROR => error!(sl!(), "{}", l),
_ => info!(sl!(), "{}", l), LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l),
} _ => info!(sl!(), "{}", l),
} }
} }
Err(err) => {
warn!(logger, "connect agent vsock failed: {:?}", err);
}
} }
} Err(err) => {
warn!(logger, "failed to connect agent-log, err: {:?}", err);
}
};
}); });
self.task_handler = Some(task_handler); self.task_handler = Some(task_handler);
Ok(()) Ok(())

View File

@ -4,8 +4,6 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use std::os::unix::prelude::AsRawFd;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::{ use tokio::{
@ -33,32 +31,43 @@ impl HybridVsock {
#[async_trait] #[async_trait]
impl Sock for HybridVsock { impl Sock for HybridVsock {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> { async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let mut last_err = None;
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
for i in 0..retry_times { for i in 0..retry_times {
match connect_helper(&self.uds, self.port).await { match connect_helper(&self.uds, self.port).await {
Ok(stream) => { Ok(stream) => {
info!( info!(sl!(), "hybrid vsock: connected to {:?}", self);
sl!(),
"connect success on {} current client fd {}",
i,
stream.as_raw_fd()
);
return Ok(Stream::Unix(stream)); return Ok(Stream::Unix(stream));
} }
Err(err) => { Err(err) => {
debug!(sl!(), "connect on {} err : {:?}", i, err); trace!(
sl!(),
"hybrid vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms",
self,
err,
i,
config.dial_timeout_ms,
);
last_err = Some(err);
tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms)) tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms))
.await; .await;
continue; continue;
} }
} }
} }
Err(anyhow!("cannot connect to agent ttrpc server {:?}", config))
// Safe to unwrap the last_err, as this line will be unreachable if
// no errors occurred.
Err(anyhow!(
"hybrid vsock: failed to connect to {:?}, err {:?}",
self,
last_err.unwrap()
))
} }
} }
async fn connect_helper(uds: &str, port: u32) -> Result<UnixStream> { async fn connect_helper(uds: &str, port: u32) -> Result<UnixStream> {
info!(sl!(), "connect uds {:?} port {}", &uds, port);
let mut stream = UnixStream::connect(&uds).await.context("connect")?; let mut stream = UnixStream::connect(&uds).await.context("connect")?;
stream stream
.write_all(format!("connect {}\n", port).as_bytes()) .write_all(format!("connect {}\n", port).as_bytes())

View File

@ -105,7 +105,7 @@ enum SockType {
} }
#[async_trait] #[async_trait]
pub trait Sock: Send + Sync { pub trait Sock: Send + Sync + std::fmt::Debug {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream>; async fn connect(&self, config: &ConnectConfig) -> Result<Stream>;
} }

View File

@ -4,7 +4,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use std::{os::unix::prelude::AsRawFd, path::Path}; use std::path::Path;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -26,27 +26,39 @@ impl Remote {
#[async_trait] #[async_trait]
impl Sock for Remote { impl Sock for Remote {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> { async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let mut last_err = None;
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
for i in 0..retry_times { for i in 0..retry_times {
match connect_helper(&self.path).await { match connect_helper(&self.path).await {
Ok(stream) => { Ok(stream) => {
info!( info!(sl!(), "remote sock: connected to {:?}", self);
sl!(),
"remote connect success on {} current client fd {}",
i,
stream.as_raw_fd()
);
return Ok(Stream::Unix(stream)); return Ok(Stream::Unix(stream));
} }
Err(err) => { Err(err) => {
debug!(sl!(), "remote connect on {} err : {:?}", i, err); trace!(
sl!(),
"remote sock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms",
self,
err,
i,
config.dial_timeout_ms
);
last_err = Some(err);
tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms)) tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms))
.await; .await;
continue; continue;
} }
} }
} }
Err(anyhow!("cannot connect to agent ttrpc server {:?}", config))
// Safe to unwrap the last_err, as this line will be unreachable if
// no errors occurred.
Err(anyhow!(
"remote sock: failed to connect to {:?}, err {:?}",
self,
last_err.unwrap()
))
} }
} }

View File

@ -31,6 +31,7 @@ impl Vsock {
#[async_trait] #[async_trait]
impl Sock for Vsock { impl Sock for Vsock {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> { async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let mut last_err = None;
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms; let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
let sock_addr = VsockAddr::new(self.vsock_cid, self.port); let sock_addr = VsockAddr::new(self.vsock_cid, self.port);
let connect_once = || { let connect_once = || {
@ -61,23 +62,30 @@ impl Sock for Vsock {
for i in 0..retry_times { for i in 0..retry_times {
match connect_once() { match connect_once() {
Ok(stream) => { Ok(stream) => {
info!( info!(sl!(), "vsock: connected to {:?}", self);
sl!(),
"connect vsock success on {} current client fd {}",
i,
stream.as_raw_fd()
);
return Ok(Stream::Vsock(stream)); return Ok(Stream::Vsock(stream));
} }
Err(e) => { Err(e) => {
debug!(sl!(), "retry after {} ms: failed to connect to agent via vsock at {} attempts: {:?}", config.dial_timeout_ms, i, e); trace!(
sl!(),
"vsock: failed to connect to {:?}, err {:?}, attempts {}, will retry after {} ms",
self,
e,
i,
config.dial_timeout_ms,
);
last_err = Some(e);
tokio::time::sleep(Duration::from_millis(config.dial_timeout_ms)).await; tokio::time::sleep(Duration::from_millis(config.dial_timeout_ms)).await;
} }
} }
} }
// Safe to unwrap the last_err, as this line will be unreachable if
// no errors occurred.
Err(anyhow!( Err(anyhow!(
"cannot connect vsock to agent ttrpc server {:?}", "vsock: failed to connect to {:?}, err {:?}",
config self,
last_err.unwrap()
)) ))
} }
} }