mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-05-01 05:04:26 +00:00
refactor(runtime-rs): Use RwLock in runtime agent
Use RwLock for Agent in runtime, for better concurrency. Fixes: #5199 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
parent
e05e42fd3c
commit
50299a3292
@ -7,12 +7,15 @@
|
|||||||
mod agent;
|
mod agent;
|
||||||
mod trans;
|
mod trans;
|
||||||
|
|
||||||
use std::os::unix::io::{IntoRawFd, RawFd};
|
use std::{
|
||||||
|
os::unix::io::{IntoRawFd, RawFd},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use kata_types::config::Agent as AgentConfig;
|
use kata_types::config::Agent as AgentConfig;
|
||||||
use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc};
|
use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::RwLock;
|
||||||
use ttrpc::asynchronous::Client;
|
use ttrpc::asynchronous::Client;
|
||||||
|
|
||||||
use crate::{log_forwarder::LogForwarder, sock};
|
use crate::{log_forwarder::LogForwarder, sock};
|
||||||
@ -41,27 +44,25 @@ pub(crate) struct KataAgentInner {
|
|||||||
log_forwarder: LogForwarder,
|
log_forwarder: LogForwarder,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for KataAgent {}
|
|
||||||
unsafe impl Sync for KataAgent {}
|
|
||||||
pub struct KataAgent {
|
pub struct KataAgent {
|
||||||
pub(crate) inner: Mutex<KataAgentInner>,
|
pub(crate) inner: Arc<RwLock<KataAgentInner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl KataAgent {
|
impl KataAgent {
|
||||||
pub fn new(config: AgentConfig) -> Self {
|
pub fn new(config: AgentConfig) -> Self {
|
||||||
KataAgent {
|
KataAgent {
|
||||||
inner: Mutex::new(KataAgentInner {
|
inner: Arc::new(RwLock::new(KataAgentInner {
|
||||||
client: None,
|
client: None,
|
||||||
client_fd: -1,
|
client_fd: -1,
|
||||||
socket_address: "".to_string(),
|
socket_address: "".to_string(),
|
||||||
config,
|
config,
|
||||||
log_forwarder: LogForwarder::new(),
|
log_forwarder: LogForwarder::new(),
|
||||||
}),
|
})),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> {
|
pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> {
|
||||||
let inner = self.inner.lock().await;
|
let inner = self.inner.read().await;
|
||||||
inner.client.as_ref().map(|c| {
|
inner.client.as_ref().map(|c| {
|
||||||
(
|
(
|
||||||
health_ttrpc::HealthClient::new(c.clone()),
|
health_ttrpc::HealthClient::new(c.clone()),
|
||||||
@ -72,7 +73,7 @@ impl KataAgent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> {
|
pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> {
|
||||||
let inner = self.inner.lock().await;
|
let inner = self.inner.read().await;
|
||||||
inner.client.as_ref().map(|c| {
|
inner.client.as_ref().map(|c| {
|
||||||
(
|
(
|
||||||
agent_ttrpc::AgentServiceClient::new(c.clone()),
|
agent_ttrpc::AgentServiceClient::new(c.clone()),
|
||||||
@ -83,13 +84,13 @@ impl KataAgent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> {
|
pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> {
|
||||||
let mut inner = self.inner.lock().await;
|
let mut inner = self.inner.write().await;
|
||||||
inner.socket_address = address.to_string();
|
inner.socket_address = address.to_string();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn connect_agent_server(&self) -> Result<()> {
|
pub(crate) async fn connect_agent_server(&self) -> Result<()> {
|
||||||
let mut inner = self.inner.lock().await;
|
let mut inner = self.inner.write().await;
|
||||||
|
|
||||||
let config = sock::ConnectConfig::new(
|
let config = sock::ConnectConfig::new(
|
||||||
inner.config.dial_timeout_ms as u64,
|
inner.config.dial_timeout_ms as u64,
|
||||||
@ -107,7 +108,7 @@ impl KataAgent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn start_log_forwarder(&self) -> Result<()> {
|
pub(crate) async fn start_log_forwarder(&self) -> Result<()> {
|
||||||
let mut inner = self.inner.lock().await;
|
let mut inner = self.inner.write().await;
|
||||||
let config = sock::ConnectConfig::new(
|
let config = sock::ConnectConfig::new(
|
||||||
inner.config.dial_timeout_ms as u64,
|
inner.config.dial_timeout_ms as u64,
|
||||||
inner.config.reconnect_timeout_ms as u64,
|
inner.config.reconnect_timeout_ms as u64,
|
||||||
@ -123,17 +124,17 @@ impl KataAgent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn stop_log_forwarder(&self) {
|
pub(crate) async fn stop_log_forwarder(&self) {
|
||||||
let mut inner = self.inner.lock().await;
|
let mut inner = self.inner.write().await;
|
||||||
inner.log_forwarder.stop();
|
inner.log_forwarder.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn agent_sock(&self) -> Result<String> {
|
pub(crate) async fn agent_sock(&self) -> Result<String> {
|
||||||
let inner = self.inner.lock().await;
|
let inner = self.inner.read().await;
|
||||||
Ok(inner.socket_address.clone())
|
Ok(inner.socket_address.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn agent_config(&self) -> AgentConfig {
|
pub(crate) async fn agent_config(&self) -> AgentConfig {
|
||||||
let inner = self.inner.lock().await;
|
let inner = self.inner.read().await;
|
||||||
inner.config.clone()
|
inner.config.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user