refactor(runtime-rs): Use RwLock in runtime agent

Use RwLock for Agent in runtime, for better concurrency.

Fixes: #5199
Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
Ji-Xinyou 2022-09-20 16:48:15 +08:00
parent e05e42fd3c
commit 50299a3292

View File

@ -7,12 +7,15 @@
mod agent;
mod trans;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::{
os::unix::io::{IntoRawFd, RawFd},
sync::Arc,
};
use anyhow::{Context, Result};
use kata_types::config::Agent as AgentConfig;
use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use ttrpc::asynchronous::Client;
use crate::{log_forwarder::LogForwarder, sock};
@ -41,27 +44,25 @@ pub(crate) struct KataAgentInner {
log_forwarder: LogForwarder,
}
unsafe impl Send for KataAgent {}
unsafe impl Sync for KataAgent {}
pub struct KataAgent {
pub(crate) inner: Mutex<KataAgentInner>,
pub(crate) inner: Arc<RwLock<KataAgentInner>>,
}
impl KataAgent {
pub fn new(config: AgentConfig) -> Self {
KataAgent {
inner: Mutex::new(KataAgentInner {
inner: Arc::new(RwLock::new(KataAgentInner {
client: None,
client_fd: -1,
socket_address: "".to_string(),
config,
log_forwarder: LogForwarder::new(),
}),
})),
}
}
pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> {
let inner = self.inner.lock().await;
let inner = self.inner.read().await;
inner.client.as_ref().map(|c| {
(
health_ttrpc::HealthClient::new(c.clone()),
@ -72,7 +73,7 @@ impl KataAgent {
}
pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> {
let inner = self.inner.lock().await;
let inner = self.inner.read().await;
inner.client.as_ref().map(|c| {
(
agent_ttrpc::AgentServiceClient::new(c.clone()),
@ -83,13 +84,13 @@ impl KataAgent {
}
pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.write().await;
inner.socket_address = address.to_string();
Ok(())
}
pub(crate) async fn connect_agent_server(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.write().await;
let config = sock::ConnectConfig::new(
inner.config.dial_timeout_ms as u64,
@ -107,7 +108,7 @@ impl KataAgent {
}
pub(crate) async fn start_log_forwarder(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.write().await;
let config = sock::ConnectConfig::new(
inner.config.dial_timeout_ms as u64,
inner.config.reconnect_timeout_ms as u64,
@ -123,17 +124,17 @@ impl KataAgent {
}
pub(crate) async fn stop_log_forwarder(&self) {
let mut inner = self.inner.lock().await;
let mut inner = self.inner.write().await;
inner.log_forwarder.stop();
}
pub(crate) async fn agent_sock(&self) -> Result<String> {
let inner = self.inner.lock().await;
let inner = self.inner.read().await;
Ok(inner.socket_address.clone())
}
pub(crate) async fn agent_config(&self) -> AgentConfig {
let inner = self.inner.lock().await;
let inner = self.inner.read().await;
inner.config.clone()
}
}