mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-10-31 01:13:02 +00:00 
			
		
		
		
	refactor(runtime-rs): Use RwLock in runtime agent
Use RwLock for Agent in runtime, for better concurrency. Fixes: #5199 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
		| @@ -7,12 +7,15 @@ | |||||||
| mod agent; | mod agent; | ||||||
| mod trans; | mod trans; | ||||||
|  |  | ||||||
| use std::os::unix::io::{IntoRawFd, RawFd}; | use std::{ | ||||||
|  |     os::unix::io::{IntoRawFd, RawFd}, | ||||||
|  |     sync::Arc, | ||||||
|  | }; | ||||||
|  |  | ||||||
| use anyhow::{Context, Result}; | use anyhow::{Context, Result}; | ||||||
| use kata_types::config::Agent as AgentConfig; | use kata_types::config::Agent as AgentConfig; | ||||||
| use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc}; | use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc}; | ||||||
| use tokio::sync::Mutex; | use tokio::sync::RwLock; | ||||||
| use ttrpc::asynchronous::Client; | use ttrpc::asynchronous::Client; | ||||||
|  |  | ||||||
| use crate::{log_forwarder::LogForwarder, sock}; | use crate::{log_forwarder::LogForwarder, sock}; | ||||||
| @@ -41,27 +44,25 @@ pub(crate) struct KataAgentInner { | |||||||
|     log_forwarder: LogForwarder, |     log_forwarder: LogForwarder, | ||||||
| } | } | ||||||
|  |  | ||||||
| unsafe impl Send for KataAgent {} |  | ||||||
| unsafe impl Sync for KataAgent {} |  | ||||||
| pub struct KataAgent { | pub struct KataAgent { | ||||||
|     pub(crate) inner: Mutex<KataAgentInner>, |     pub(crate) inner: Arc<RwLock<KataAgentInner>>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl KataAgent { | impl KataAgent { | ||||||
|     pub fn new(config: AgentConfig) -> Self { |     pub fn new(config: AgentConfig) -> Self { | ||||||
|         KataAgent { |         KataAgent { | ||||||
|             inner: Mutex::new(KataAgentInner { |             inner: Arc::new(RwLock::new(KataAgentInner { | ||||||
|                 client: None, |                 client: None, | ||||||
|                 client_fd: -1, |                 client_fd: -1, | ||||||
|                 socket_address: "".to_string(), |                 socket_address: "".to_string(), | ||||||
|                 config, |                 config, | ||||||
|                 log_forwarder: LogForwarder::new(), |                 log_forwarder: LogForwarder::new(), | ||||||
|             }), |             })), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> { |     pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> { | ||||||
|         let inner = self.inner.lock().await; |         let inner = self.inner.read().await; | ||||||
|         inner.client.as_ref().map(|c| { |         inner.client.as_ref().map(|c| { | ||||||
|             ( |             ( | ||||||
|                 health_ttrpc::HealthClient::new(c.clone()), |                 health_ttrpc::HealthClient::new(c.clone()), | ||||||
| @@ -72,7 +73,7 @@ impl KataAgent { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> { |     pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> { | ||||||
|         let inner = self.inner.lock().await; |         let inner = self.inner.read().await; | ||||||
|         inner.client.as_ref().map(|c| { |         inner.client.as_ref().map(|c| { | ||||||
|             ( |             ( | ||||||
|                 agent_ttrpc::AgentServiceClient::new(c.clone()), |                 agent_ttrpc::AgentServiceClient::new(c.clone()), | ||||||
| @@ -83,13 +84,13 @@ impl KataAgent { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> { |     pub(crate) async fn set_socket_address(&self, address: &str) -> Result<()> { | ||||||
|         let mut inner = self.inner.lock().await; |         let mut inner = self.inner.write().await; | ||||||
|         inner.socket_address = address.to_string(); |         inner.socket_address = address.to_string(); | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) async fn connect_agent_server(&self) -> Result<()> { |     pub(crate) async fn connect_agent_server(&self) -> Result<()> { | ||||||
|         let mut inner = self.inner.lock().await; |         let mut inner = self.inner.write().await; | ||||||
|  |  | ||||||
|         let config = sock::ConnectConfig::new( |         let config = sock::ConnectConfig::new( | ||||||
|             inner.config.dial_timeout_ms as u64, |             inner.config.dial_timeout_ms as u64, | ||||||
| @@ -107,7 +108,7 @@ impl KataAgent { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) async fn start_log_forwarder(&self) -> Result<()> { |     pub(crate) async fn start_log_forwarder(&self) -> Result<()> { | ||||||
|         let mut inner = self.inner.lock().await; |         let mut inner = self.inner.write().await; | ||||||
|         let config = sock::ConnectConfig::new( |         let config = sock::ConnectConfig::new( | ||||||
|             inner.config.dial_timeout_ms as u64, |             inner.config.dial_timeout_ms as u64, | ||||||
|             inner.config.reconnect_timeout_ms as u64, |             inner.config.reconnect_timeout_ms as u64, | ||||||
| @@ -123,17 +124,17 @@ impl KataAgent { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) async fn stop_log_forwarder(&self) { |     pub(crate) async fn stop_log_forwarder(&self) { | ||||||
|         let mut inner = self.inner.lock().await; |         let mut inner = self.inner.write().await; | ||||||
|         inner.log_forwarder.stop(); |         inner.log_forwarder.stop(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) async fn agent_sock(&self) -> Result<String> { |     pub(crate) async fn agent_sock(&self) -> Result<String> { | ||||||
|         let inner = self.inner.lock().await; |         let inner = self.inner.read().await; | ||||||
|         Ok(inner.socket_address.clone()) |         Ok(inner.socket_address.clone()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) async fn agent_config(&self) -> AgentConfig { |     pub(crate) async fn agent_config(&self) -> AgentConfig { | ||||||
|         let inner = self.inner.lock().await; |         let inner = self.inner.read().await; | ||||||
|         inner.config.clone() |         inner.config.clone() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user