agent: add support for MemAgentMemcgSet and MemAgentCompactSet

Add MemAgentMemcgSet and MemAgentCompactSet to agent API to set the config of
mem-agent memcg and compact.

Fixes: #10625

Signed-off-by: Hui Zhu <teawater@antgroup.com>
This commit is contained in:
Hui Zhu 2024-12-02 18:16:57 +08:00
parent f84ad54d97
commit 692ded8f96
2 changed files with 93 additions and 3 deletions

View File

@ -428,7 +428,8 @@ async fn start_sandbox(
init_attestation_components(logger, config).await?;
}
let mut _oma = None;
let mut oma = None;
let mut _ort = None;
if let Some(c) = &config.mem_agent {
let (ma, rt) =
mem_agent::agent::MemAgent::new(c.memcg_config.clone(), c.compact_config.clone())
@ -437,11 +438,13 @@ async fn start_sandbox(
e
})
.context("start mem-agent")?;
_oma = Some((ma, rt));
oma = Some(ma);
_ort = Some(rt);
}
// vsock:///dev/vsock, port
let mut server = rpc::start(sandbox.clone(), config.server_addr.as_str(), init_mode).await?;
let mut server =
rpc::start(sandbox.clone(), config.server_addr.as_str(), init_mode, oma).await?;
server.start().await?;

View File

@ -181,6 +181,7 @@ impl<T> OptionToTtrpcResult<T> for Option<T> {
pub struct AgentService {
sandbox: Arc<Mutex<Sandbox>>,
init_mode: bool,
oma: Option<mem_agent::agent::MemAgent>,
}
impl AgentService {
@ -698,6 +699,37 @@ impl AgentService {
}
}
fn mem_agent_memcgconfig_to_memcg_optionconfig(
mc: &protocols::agent::MemAgentMemcgConfig,
) -> mem_agent::memcg::OptionConfig {
mem_agent::memcg::OptionConfig {
disabled: mc.disabled,
swap: mc.swap,
swappiness_max: mc.swappiness_max.map(|x| x as u8),
period_secs: mc.period_secs,
period_psi_percent_limit: mc.period_psi_percent_limit.map(|x| x as u8),
eviction_psi_percent_limit: mc.eviction_psi_percent_limit.map(|x| x as u8),
eviction_run_aging_count_min: mc.eviction_run_aging_count_min,
..Default::default()
}
}
fn mem_agent_compactconfig_to_compact_optionconfig(
cc: &protocols::agent::MemAgentCompactConfig,
) -> mem_agent::compact::OptionConfig {
mem_agent::compact::OptionConfig {
disabled: cc.disabled,
period_secs: cc.period_secs,
period_psi_percent_limit: cc.period_psi_percent_limit.map(|x| x as u8),
compact_psi_percent_limit: cc.compact_psi_percent_limit.map(|x| x as u8),
compact_sec_max: cc.compact_sec_max,
compact_order: cc.compact_order.map(|x| x as u8),
compact_threshold: cc.compact_threshold,
compact_force_times: cc.compact_force_times,
..Default::default()
}
}
#[async_trait]
impl agent_ttrpc::AgentService for AgentService {
async fn create_container(
@ -1513,6 +1545,54 @@ impl agent_ttrpc::AgentService for AgentService {
Ok(Empty::new())
}
async fn mem_agent_memcg_set(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
config: protocols::agent::MemAgentMemcgConfig,
) -> ::ttrpc::Result<Empty> {
if let Some(ma) = &self.oma {
ma.memcg_set_config_async(mem_agent_memcgconfig_to_memcg_optionconfig(&config))
.await
.map_err(|e| {
let estr = format!("ma.memcg_set_config_async fail: {}", e);
error!(sl(), "{}", estr);
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, estr))
})?;
} else {
let estr = "mem-agent is disabled";
error!(sl(), "{}", estr);
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
estr,
)));
}
Ok(Empty::new())
}
async fn mem_agent_compact_set(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
config: protocols::agent::MemAgentCompactConfig,
) -> ::ttrpc::Result<Empty> {
if let Some(ma) = &self.oma {
ma.compact_set_config_async(mem_agent_compactconfig_to_compact_optionconfig(&config))
.await
.map_err(|e| {
let estr = format!("ma.compact_set_config_async fail: {}", e);
error!(sl(), "{}", estr);
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, estr))
})?;
} else {
let estr = "mem-agent is disabled";
error!(sl(), "{}", estr);
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
estr,
)));
}
Ok(Empty::new())
}
}
#[derive(Clone)]
@ -1656,10 +1736,12 @@ pub async fn start(
s: Arc<Mutex<Sandbox>>,
server_address: &str,
init_mode: bool,
oma: Option<mem_agent::agent::MemAgent>,
) -> Result<TtrpcServer> {
let agent_service = Box::new(AgentService {
sandbox: s,
init_mode,
oma,
}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
let aservice = agent_ttrpc::create_agent_service(Arc::new(agent_service));
@ -2294,6 +2376,7 @@ mod tests {
let agent_service = Box::new(AgentService {
sandbox: Arc::new(Mutex::new(sandbox)),
init_mode: true,
oma: None,
});
let req = protocols::agent::UpdateInterfaceRequest::default();
@ -2311,6 +2394,7 @@ mod tests {
let agent_service = Box::new(AgentService {
sandbox: Arc::new(Mutex::new(sandbox)),
init_mode: true,
oma: None,
});
let req = protocols::agent::UpdateRoutesRequest::default();
@ -2328,6 +2412,7 @@ mod tests {
let agent_service = Box::new(AgentService {
sandbox: Arc::new(Mutex::new(sandbox)),
init_mode: true,
oma: None,
});
let req = protocols::agent::AddARPNeighborsRequest::default();
@ -2466,6 +2551,7 @@ mod tests {
let agent_service = Box::new(AgentService {
sandbox: Arc::new(Mutex::new(sandbox)),
init_mode: true,
oma: None,
});
let result = agent_service
@ -2956,6 +3042,7 @@ OtherField:other
let agent_service = Box::new(AgentService {
sandbox: Arc::new(Mutex::new(sandbox)),
init_mode: true,
oma: None,
});
let ctx = mk_ttrpc_context();