From 50299a329224c26486957cdc58b805a8a830e115 Mon Sep 17 00:00:00 2001 From: Ji-Xinyou Date: Tue, 20 Sep 2022 16:48:15 +0800 Subject: [PATCH] refactor(runtime-rs): Use RwLock in runtime agent Use RwLock for Agent in runtime, for better concurrency. Fixes: #5199 Signed-off-by: Ji-Xinyou --- src/runtime-rs/crates/agent/src/kata/mod.rs | 31 +++++++++++---------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/runtime-rs/crates/agent/src/kata/mod.rs b/src/runtime-rs/crates/agent/src/kata/mod.rs index fc4fdef58c..920b55340c 100644 --- a/src/runtime-rs/crates/agent/src/kata/mod.rs +++ b/src/runtime-rs/crates/agent/src/kata/mod.rs @@ -7,12 +7,15 @@ mod agent; mod trans; -use std::os::unix::io::{IntoRawFd, RawFd}; +use std::{ + os::unix::io::{IntoRawFd, RawFd}, + sync::Arc, +}; use anyhow::{Context, Result}; use kata_types::config::Agent as AgentConfig; use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc}; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use ttrpc::asynchronous::Client; use crate::{log_forwarder::LogForwarder, sock}; @@ -41,27 +44,25 @@ pub(crate) struct KataAgentInner { log_forwarder: LogForwarder, } -unsafe impl Send for KataAgent {} -unsafe impl Sync for KataAgent {} pub struct KataAgent { - pub(crate) inner: Mutex, + pub(crate) inner: Arc>, } impl KataAgent { pub fn new(config: AgentConfig) -> Self { KataAgent { - inner: Mutex::new(KataAgentInner { + inner: Arc::new(RwLock::new(KataAgentInner { client: None, client_fd: -1, socket_address: "".to_string(), config, log_forwarder: LogForwarder::new(), - }), + })), } } pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> { - let inner = self.inner.lock().await; + let inner = self.inner.read().await; inner.client.as_ref().map(|c| { ( health_ttrpc::HealthClient::new(c.clone()), @@ -72,7 +73,7 @@ impl KataAgent { } pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> { - let inner = self.inner.lock().await; + let inner = self.inner.read().await; inner.client.as_ref().map(|c| { ( agent_ttrpc::AgentServiceClient::new(c.clone()), @@ -83,13 +84,13 @@ impl KataAgent { } pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.write().await; inner.socket_address = address.to_string(); Ok(()) } pub(crate) async fn connect_agent_server(&self) -> Result<()> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.write().await; let config = sock::ConnectConfig::new( inner.config.dial_timeout_ms as u64, @@ -107,7 +108,7 @@ impl KataAgent { } pub(crate) async fn start_log_forwarder(&self) -> Result<()> { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.write().await; let config = sock::ConnectConfig::new( inner.config.dial_timeout_ms as u64, inner.config.reconnect_timeout_ms as u64, @@ -123,17 +124,17 @@ impl KataAgent { } pub(crate) async fn stop_log_forwarder(&self) { - let mut inner = self.inner.lock().await; + let mut inner = self.inner.write().await; inner.log_forwarder.stop(); } pub(crate) async fn agent_sock(&self) -> Result { - let inner = self.inner.lock().await; + let inner = self.inner.read().await; Ok(inner.socket_address.clone()) } pub(crate) async fn agent_config(&self) -> AgentConfig { - let inner = self.inner.lock().await; + let inner = self.inner.read().await; inner.config.clone() } }