agent: avoid clone objects when possible

Optimize agent rpc implementation by:
- avoid clone objects when possible
- avoid unwrap() when possible
- explictly drop object to ensure order

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-04 11:26:05 +08:00
parent b098960442
commit 84badd89d7

View File

@ -169,7 +169,7 @@ impl AgentService {
// updates the devices listed in the OCI spec, so that they actually // updates the devices listed in the OCI spec, so that they actually
// match real devices inside the VM. This step is necessary since we // match real devices inside the VM. This step is necessary since we
// cannot predict everything from the caller. // cannot predict everything from the caller.
add_devices(&req.devices.to_vec(), &mut oci, &self.sandbox).await?; add_devices(&req.devices, &mut oci, &self.sandbox).await?;
// Both rootfs and volumes (invoked with --volume for instance) will // Both rootfs and volumes (invoked with --volume for instance) will
// be processed the same way. The idea is to always mount any provided // be processed the same way. The idea is to always mount any provided
@ -180,7 +180,7 @@ impl AgentService {
// list) to bind mount all of them inside the container. // list) to bind mount all of them inside the container.
let m = add_storages( let m = add_storages(
sl(), sl(),
req.storages.to_vec(), req.storages,
&self.sandbox, &self.sandbox,
Some(req.container_id.clone()), Some(req.container_id.clone()),
) )
@ -258,15 +258,13 @@ impl AgentService {
#[instrument] #[instrument]
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> { async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
let cid = req.container_id;
let mut s = self.sandbox.lock().await; let mut s = self.sandbox.lock().await;
let sid = s.id.clone(); let sid = s.id.clone();
let cid = req.container_id;
let ctr = s let ctr = s
.get_container(&cid) .get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?; .ok_or_else(|| anyhow!("Invalid container id"))?;
ctr.exec().await?; ctr.exec().await?;
if sid == cid { if sid == cid {
@ -274,13 +272,9 @@ impl AgentService {
} }
// start oom event loop // start oom event loop
if let Ok(cg_path) = ctr.cgroup_manager.as_ref().get_cgroup_path("memory") {
let cg_path = ctr.cgroup_manager.as_ref().get_cgroup_path("memory");
if let Ok(cg_path) = cg_path {
let rx = notifier::notify_oom(cid.as_str(), cg_path.to_string()).await?; let rx = notifier::notify_oom(cid.as_str(), cg_path.to_string()).await?;
s.run_oom_event_monitor(rx, cid).await;
s.run_oom_event_monitor(rx, cid.clone()).await;
} }
Ok(()) Ok(())
@ -291,7 +285,7 @@ impl AgentService {
&self, &self,
req: protocols::agent::RemoveContainerRequest, req: protocols::agent::RemoveContainerRequest,
) -> Result<()> { ) -> Result<()> {
let cid = req.container_id.clone(); let cid = req.container_id;
if req.timeout == 0 { if req.timeout == 0 {
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
@ -329,26 +323,20 @@ impl AgentService {
{ {
return Err(anyhow!(nix::Error::ETIME)); return Err(anyhow!(nix::Error::ETIME));
} }
handle.await?;
if handle.await.is_err() {
return Err(anyhow!(nix::Error::UnknownErrno));
}
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
remove_container_resources(&mut sandbox, &cid)?; remove_container_resources(&mut sandbox, &cid)
Ok(())
} }
#[instrument] #[instrument]
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> { async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
let cid = req.container_id.clone(); let cid = req.container_id;
let exec_id = req.exec_id.clone(); let exec_id = req.exec_id;
info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id); info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id);
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let mut process = req let mut process = req
.process .process
.into_option() .into_option()
@ -365,21 +353,19 @@ impl AgentService {
.get_container(&cid) .get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?; .ok_or_else(|| anyhow!("Invalid container id"))?;
ctr.run(p).await?; ctr.run(p).await
Ok(())
} }
#[instrument] #[instrument]
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> { async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
let cid = req.container_id.clone(); let cid = req.container_id;
let eid = req.exec_id.clone(); let eid = req.exec_id;
info!( info!(
sl(), sl(),
"signal process"; "signal process";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone(), "exec-id" => &eid,
"signal" => req.signal, "signal" => req.signal,
); );
@ -400,8 +386,8 @@ impl AgentService {
info!( info!(
sl(), sl(),
"signal encounter ESRCH, continue"; "signal encounter ESRCH, continue";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone(), "exec-id" => &eid,
"pid" => p.pid, "pid" => p.pid,
"signal" => sig, "signal" => sig,
); );
@ -416,16 +402,16 @@ impl AgentService {
info!( info!(
sl(), sl(),
"signal all the remaining processes"; "signal all the remaining processes";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone(), "exec-id" => &eid,
); );
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await { if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await {
warn!( warn!(
sl(), sl(),
"freeze cgroup failed"; "freeze cgroup failed";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone(), "exec-id" => &eid,
"error" => format!("{:?}", err), "error" => format!("{:?}", err),
); );
} }
@ -437,8 +423,8 @@ impl AgentService {
warn!( warn!(
sl(), sl(),
"signal failed"; "signal failed";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone(), "exec-id" => &eid,
"pid" => pid, "pid" => pid,
"error" => format!("{:?}", err), "error" => format!("{:?}", err),
); );
@ -448,12 +434,13 @@ impl AgentService {
warn!( warn!(
sl(), sl(),
"unfreeze cgroup failed"; "unfreeze cgroup failed";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone(), "exec-id" => &eid,
"error" => format!("{:?}", err), "error" => format!("{:?}", err),
); );
} }
} }
Ok(()) Ok(())
} }
@ -462,8 +449,7 @@ impl AgentService {
let ctr = sandbox let ctr = sandbox
.get_container(cid) .get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?; .ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
ctr.cgroup_manager.as_ref().freeze(state)?; ctr.cgroup_manager.as_ref().freeze(state)
Ok(())
} }
async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> { async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> {
@ -471,8 +457,7 @@ impl AgentService {
let ctr = sandbox let ctr = sandbox
.get_container(cid) .get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?; .ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
let pids = ctr.cgroup_manager.as_ref().get_pids()?; ctr.cgroup_manager.as_ref().get_pids()
Ok(pids)
} }
#[instrument] #[instrument]
@ -480,20 +465,19 @@ impl AgentService {
&self, &self,
req: protocols::agent::WaitProcessRequest, req: protocols::agent::WaitProcessRequest,
) -> Result<protocols::agent::WaitProcessResponse> { ) -> Result<protocols::agent::WaitProcessResponse> {
let cid = req.container_id.clone(); let cid = req.container_id;
let eid = req.exec_id; let eid = req.exec_id;
let mut resp = WaitProcessResponse::new(); let mut resp = WaitProcessResponse::new();
let pid: pid_t;
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
info!( info!(
sl(), sl(),
"wait process"; "wait process";
"container-id" => cid.clone(), "container-id" => &cid,
"exec-id" => eid.clone() "exec-id" => &eid
); );
let pid: pid_t;
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
let exit_rx = { let exit_rx = {
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
@ -548,8 +532,8 @@ impl AgentService {
&self, &self,
req: protocols::agent::WriteStreamRequest, req: protocols::agent::WriteStreamRequest,
) -> Result<protocols::agent::WriteStreamResponse> { ) -> Result<protocols::agent::WriteStreamResponse> {
let cid = req.container_id.clone(); let cid = req.container_id;
let eid = req.exec_id.clone(); let eid = req.exec_id;
let writer = { let writer = {
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
@ -601,10 +585,6 @@ impl AgentService {
} }
}; };
if reader.is_none() {
return Err(anyhow!("Unable to determine stream reader, is None"));
}
let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?; let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?;
tokio::select! { tokio::select! {
@ -663,7 +643,6 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_container", req); trace_rpc_call!(ctx, "remove_container", req);
is_allowed(&req)?; is_allowed(&req)?;
match self.do_remove_container(req).await { match self.do_remove_container(req).await {
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
Ok(_) => Ok(Empty::new()), Ok(_) => Ok(Empty::new()),
@ -715,32 +694,23 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "update_container", req); trace_rpc_call!(ctx, "update_container", req);
is_allowed(&req)?; is_allowed(&req)?;
let cid = req.container_id.clone();
let res = req.resources;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error( ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(), "invalid container id".to_string(),
) )
})?; })?;
let resp = Empty::new(); if let Some(res) = req.resources.as_ref() {
if let Some(res) = res.as_ref() {
let oci_res = rustjail::resources_grpc_to_oci(res); let oci_res = rustjail::resources_grpc_to_oci(res);
match ctr.set(oci_res) { if let Err(e) = ctr.set(oci_res) {
Err(e) => {
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
} }
Ok(_) => return Ok(resp),
}
} }
Ok(resp) Ok(Empty::new())
} }
async fn stats_container( async fn stats_container(
@ -750,10 +720,9 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<StatsContainerResponse> { ) -> ttrpc::Result<StatsContainerResponse> {
trace_rpc_call!(ctx, "stats_container", req); trace_rpc_call!(ctx, "stats_container", req);
is_allowed(&req)?; is_allowed(&req)?;
let cid = req.container_id;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| { let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
ttrpc_error( ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(), "invalid container id".to_string(),
@ -771,10 +740,9 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<protocols::empty::Empty> { ) -> ttrpc::Result<protocols::empty::Empty> {
trace_rpc_call!(ctx, "pause_container", req); trace_rpc_call!(ctx, "pause_container", req);
is_allowed(&req)?; is_allowed(&req)?;
let cid = req.container_id();
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| { let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
ttrpc_error( ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(), "invalid container id".to_string(),
@ -794,10 +762,9 @@ impl agent_ttrpc::AgentService for AgentService {
) -> ttrpc::Result<protocols::empty::Empty> { ) -> ttrpc::Result<protocols::empty::Empty> {
trace_rpc_call!(ctx, "resume_container", req); trace_rpc_call!(ctx, "resume_container", req);
is_allowed(&req)?; is_allowed(&req)?;
let cid = req.container_id();
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| { let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
ttrpc_error( ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(), "invalid container id".to_string(),
@ -874,7 +841,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "close_stdin", req); trace_rpc_call!(ctx, "close_stdin", req);
is_allowed(&req)?; is_allowed(&req)?;
let cid = req.container_id.clone(); let cid = req.container_id;
let eid = req.exec_id; let eid = req.exec_id;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
@ -900,11 +867,9 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "tty_win_resize", req); trace_rpc_call!(ctx, "tty_win_resize", req);
is_allowed(&req)?; is_allowed(&req)?;
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
let p = sandbox let p = sandbox
.find_container_process(cid.as_str(), eid.as_str()) .find_container_process(req.container_id(), req.exec_id())
.map_err(|e| { .map_err(|e| {
ttrpc_error( ttrpc_error(
ttrpc::Code::UNAVAILABLE, ttrpc::Code::UNAVAILABLE,
@ -1290,15 +1255,14 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?; is_allowed(&req)?;
let mut sandbox = self.sandbox.lock().await; let mut sandbox = self.sandbox.lock().await;
// destroy all containers, clean up, notify agent to exit // destroy all containers, clean up, notify agent to exit etc.
// etc.
sandbox sandbox
.destroy() .destroy()
.await .await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
// Close get_oom_event connection, // Close get_oom_event connection,
// otherwise it will block the shutdown of ttrpc. // otherwise it will block the shutdown of ttrpc.
sandbox.event_tx.take(); drop(sandbox.event_tx.take());
sandbox sandbox
.sender .sender
@ -1355,9 +1319,9 @@ impl agent_ttrpc::AgentService for AgentService {
ctx: &TtrpcContext, ctx: &TtrpcContext,
req: protocols::agent::OnlineCPUMemRequest, req: protocols::agent::OnlineCPUMemRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "online_cpu_mem", req);
is_allowed(&req)?; is_allowed(&req)?;
let sandbox = self.sandbox.lock().await; let sandbox = self.sandbox.lock().await;
trace_rpc_call!(ctx, "online_cpu_mem", req);
sandbox sandbox
.online_cpu_memory(&req) .online_cpu_memory(&req)
@ -1506,7 +1470,6 @@ impl agent_ttrpc::AgentService for AgentService {
info!(sl(), "get volume stats!"); info!(sl(), "get volume stats!");
let mut resp = VolumeStatsResponse::new(); let mut resp = VolumeStatsResponse::new();
let mut condition = VolumeCondition::new(); let mut condition = VolumeCondition::new();
match File::open(&req.volume_guest_path) { match File::open(&req.volume_guest_path) {
@ -1698,15 +1661,10 @@ pub fn start(s: Arc<Mutex<Sandbox>>, server_address: &str, init_mode: bool) -> R
sandbox: s, sandbox: s,
init_mode, init_mode,
}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>; }) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
let aservice = agent_ttrpc::create_agent_service(Arc::new(agent_service));
let agent_worker = Arc::new(agent_service);
let health_service = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>; let health_service = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
let health_worker = Arc::new(health_service); let hservice = health_ttrpc::create_health(Arc::new(health_service));
let aservice = agent_ttrpc::create_agent_service(agent_worker);
let hservice = health_ttrpc::create_health(health_worker);
let server = TtrpcServer::new() let server = TtrpcServer::new()
.bind(server_address)? .bind(server_address)?
@ -1750,6 +1708,7 @@ fn update_container_namespaces(
continue; continue;
} }
} }
// update pid namespace // update pid namespace
let mut pid_ns = LinuxNamespace { let mut pid_ns = LinuxNamespace {
r#type: NSTYPEPID.to_string(), r#type: NSTYPEPID.to_string(),