mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
refactor(runtime-rs): Use RwLock in runtime agent
Use RwLock for Agent in runtime, for better concurrency. Fixes: #5199 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
parent
e05e42fd3c
commit
50299a3292
@ -7,12 +7,15 @@
|
||||
mod agent;
|
||||
mod trans;
|
||||
|
||||
use std::os::unix::io::{IntoRawFd, RawFd};
|
||||
use std::{
|
||||
os::unix::io::{IntoRawFd, RawFd},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use kata_types::config::Agent as AgentConfig;
|
||||
use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use ttrpc::asynchronous::Client;
|
||||
|
||||
use crate::{log_forwarder::LogForwarder, sock};
|
||||
@ -41,27 +44,25 @@ pub(crate) struct KataAgentInner {
|
||||
log_forwarder: LogForwarder,
|
||||
}
|
||||
|
||||
unsafe impl Send for KataAgent {}
|
||||
unsafe impl Sync for KataAgent {}
|
||||
pub struct KataAgent {
|
||||
pub(crate) inner: Mutex<KataAgentInner>,
|
||||
pub(crate) inner: Arc<RwLock<KataAgentInner>>,
|
||||
}
|
||||
|
||||
impl KataAgent {
|
||||
pub fn new(config: AgentConfig) -> Self {
|
||||
KataAgent {
|
||||
inner: Mutex::new(KataAgentInner {
|
||||
inner: Arc::new(RwLock::new(KataAgentInner {
|
||||
client: None,
|
||||
client_fd: -1,
|
||||
socket_address: "".to_string(),
|
||||
config,
|
||||
log_forwarder: LogForwarder::new(),
|
||||
}),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> {
|
||||
let inner = self.inner.lock().await;
|
||||
let inner = self.inner.read().await;
|
||||
inner.client.as_ref().map(|c| {
|
||||
(
|
||||
health_ttrpc::HealthClient::new(c.clone()),
|
||||
@ -72,7 +73,7 @@ impl KataAgent {
|
||||
}
|
||||
|
||||
pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> {
|
||||
let inner = self.inner.lock().await;
|
||||
let inner = self.inner.read().await;
|
||||
inner.client.as_ref().map(|c| {
|
||||
(
|
||||
agent_ttrpc::AgentServiceClient::new(c.clone()),
|
||||
@ -83,13 +84,13 @@ impl KataAgent {
|
||||
}
|
||||
|
||||
pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.socket_address = address.to_string();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_agent_server(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
let config = sock::ConnectConfig::new(
|
||||
inner.config.dial_timeout_ms as u64,
|
||||
@ -107,7 +108,7 @@ impl KataAgent {
|
||||
}
|
||||
|
||||
pub(crate) async fn start_log_forwarder(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let mut inner = self.inner.write().await;
|
||||
let config = sock::ConnectConfig::new(
|
||||
inner.config.dial_timeout_ms as u64,
|
||||
inner.config.reconnect_timeout_ms as u64,
|
||||
@ -123,17 +124,17 @@ impl KataAgent {
|
||||
}
|
||||
|
||||
pub(crate) async fn stop_log_forwarder(&self) {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.log_forwarder.stop();
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_sock(&self) -> Result<String> {
|
||||
let inner = self.inner.lock().await;
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.socket_address.clone())
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_config(&self) -> AgentConfig {
|
||||
let inner = self.inner.lock().await;
|
||||
let inner = self.inner.read().await;
|
||||
inner.config.clone()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user