diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index c49abd70e1..c4df5f4aec 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -428,7 +428,8 @@ async fn start_sandbox( init_attestation_components(logger, config).await?; } - let mut _oma = None; + let mut oma = None; + let mut _ort = None; if let Some(c) = &config.mem_agent { let (ma, rt) = mem_agent::agent::MemAgent::new(c.memcg_config.clone(), c.compact_config.clone()) @@ -437,11 +438,13 @@ async fn start_sandbox( e }) .context("start mem-agent")?; - _oma = Some((ma, rt)); + oma = Some(ma); + _ort = Some(rt); } // vsock:///dev/vsock, port - let mut server = rpc::start(sandbox.clone(), config.server_addr.as_str(), init_mode).await?; + let mut server = + rpc::start(sandbox.clone(), config.server_addr.as_str(), init_mode, oma).await?; server.start().await?; diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 0a1c6d34ad..2bcf79b8e8 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -181,6 +181,7 @@ impl OptionToTtrpcResult for Option { pub struct AgentService { sandbox: Arc>, init_mode: bool, + oma: Option, } impl AgentService { @@ -698,6 +699,37 @@ impl AgentService { } } +fn mem_agent_memcgconfig_to_memcg_optionconfig( + mc: &protocols::agent::MemAgentMemcgConfig, +) -> mem_agent::memcg::OptionConfig { + mem_agent::memcg::OptionConfig { + disabled: mc.disabled, + swap: mc.swap, + swappiness_max: mc.swappiness_max.map(|x| x as u8), + period_secs: mc.period_secs, + period_psi_percent_limit: mc.period_psi_percent_limit.map(|x| x as u8), + eviction_psi_percent_limit: mc.eviction_psi_percent_limit.map(|x| x as u8), + eviction_run_aging_count_min: mc.eviction_run_aging_count_min, + ..Default::default() + } +} + +fn mem_agent_compactconfig_to_compact_optionconfig( + cc: &protocols::agent::MemAgentCompactConfig, +) -> mem_agent::compact::OptionConfig { + mem_agent::compact::OptionConfig { + disabled: cc.disabled, + period_secs: cc.period_secs, + period_psi_percent_limit: cc.period_psi_percent_limit.map(|x| x as u8), + compact_psi_percent_limit: cc.compact_psi_percent_limit.map(|x| x as u8), + compact_sec_max: cc.compact_sec_max, + compact_order: cc.compact_order.map(|x| x as u8), + compact_threshold: cc.compact_threshold, + compact_force_times: cc.compact_force_times, + ..Default::default() + } +} + #[async_trait] impl agent_ttrpc::AgentService for AgentService { async fn create_container( @@ -1513,6 +1545,54 @@ impl agent_ttrpc::AgentService for AgentService { Ok(Empty::new()) } + + async fn mem_agent_memcg_set( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + config: protocols::agent::MemAgentMemcgConfig, + ) -> ::ttrpc::Result { + if let Some(ma) = &self.oma { + ma.memcg_set_config_async(mem_agent_memcgconfig_to_memcg_optionconfig(&config)) + .await + .map_err(|e| { + let estr = format!("ma.memcg_set_config_async fail: {}", e); + error!(sl(), "{}", estr); + ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, estr)) + })?; + } else { + let estr = "mem-agent is disabled"; + error!(sl(), "{}", estr); + return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc::Code::INTERNAL, + estr, + ))); + } + Ok(Empty::new()) + } + + async fn mem_agent_compact_set( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + config: protocols::agent::MemAgentCompactConfig, + ) -> ::ttrpc::Result { + if let Some(ma) = &self.oma { + ma.compact_set_config_async(mem_agent_compactconfig_to_compact_optionconfig(&config)) + .await + .map_err(|e| { + let estr = format!("ma.compact_set_config_async fail: {}", e); + error!(sl(), "{}", estr); + ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, estr)) + })?; + } else { + let estr = "mem-agent is disabled"; + error!(sl(), "{}", estr); + return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc::Code::INTERNAL, + estr, + ))); + } + Ok(Empty::new()) + } } #[derive(Clone)] @@ -1656,10 +1736,12 @@ pub async fn start( s: Arc>, server_address: &str, init_mode: bool, + oma: Option, ) -> Result { let agent_service = Box::new(AgentService { sandbox: s, init_mode, + oma, }) as Box; let aservice = agent_ttrpc::create_agent_service(Arc::new(agent_service)); @@ -2294,6 +2376,7 @@ mod tests { let agent_service = Box::new(AgentService { sandbox: Arc::new(Mutex::new(sandbox)), init_mode: true, + oma: None, }); let req = protocols::agent::UpdateInterfaceRequest::default(); @@ -2311,6 +2394,7 @@ mod tests { let agent_service = Box::new(AgentService { sandbox: Arc::new(Mutex::new(sandbox)), init_mode: true, + oma: None, }); let req = protocols::agent::UpdateRoutesRequest::default(); @@ -2328,6 +2412,7 @@ mod tests { let agent_service = Box::new(AgentService { sandbox: Arc::new(Mutex::new(sandbox)), init_mode: true, + oma: None, }); let req = protocols::agent::AddARPNeighborsRequest::default(); @@ -2466,6 +2551,7 @@ mod tests { let agent_service = Box::new(AgentService { sandbox: Arc::new(Mutex::new(sandbox)), init_mode: true, + oma: None, }); let result = agent_service @@ -2956,6 +3042,7 @@ OtherField:other let agent_service = Box::new(AgentService { sandbox: Arc::new(Mutex::new(sandbox)), init_mode: true, + oma: None, }); let ctx = mk_ttrpc_context();