Merge pull request #7191 from wedsonaf/avoid-clones

agent: avoid unnecessary calls to `Arc::clone`
This commit is contained in:
Fabiano Fidêncio
2023-08-06 15:34:07 +02:00
committed by GitHub
2 changed files with 61 additions and 100 deletions

View File

@@ -230,7 +230,7 @@ pub fn baremount(
async fn ephemeral_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
@@ -278,7 +278,7 @@ async fn ephemeral_storage_handler(
pub async fn update_ephemeral_mounts(
logger: Logger,
storages: Vec<Storage>,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<()> {
for (_, storage) in storages.iter().enumerate() {
let handler_name = storage.driver.clone();
@@ -341,7 +341,7 @@ async fn overlayfs_storage_handler(
logger: &Logger,
storage: &Storage,
cid: Option<&str>,
_sandbox: Arc<Mutex<Sandbox>>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
if storage
.options
@@ -374,7 +374,7 @@ async fn overlayfs_storage_handler(
async fn local_storage_handler(
_logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}",
@@ -414,7 +414,7 @@ async fn local_storage_handler(
async fn virtio9p_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: Arc<Mutex<Sandbox>>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
@@ -528,11 +528,11 @@ fn get_pagesize_and_size_from_option(options: &[String]) -> Result<(u64, u64)> {
async fn virtiommio_blk_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
if !Path::new(&storage.source).exists() {
get_virtio_mmio_device_name(&sandbox, &storage.source)
get_virtio_mmio_device_name(sandbox, &storage.source)
.await
.context("failed to get mmio device name")?;
}
@@ -545,7 +545,7 @@ async fn virtiommio_blk_storage_handler(
async fn virtiofs_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: Arc<Mutex<Sandbox>>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
@@ -555,7 +555,7 @@ async fn virtiofs_storage_handler(
async fn virtio_blk_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
// If hot-plugged, get the device node path based on the PCI path
@@ -570,7 +570,7 @@ async fn virtio_blk_storage_handler(
}
} else {
let pcipath = pci::Path::from_str(&storage.source)?;
let dev_path = get_virtio_blk_pci_device_name(&sandbox, &pcipath).await?;
let dev_path = get_virtio_blk_pci_device_name(sandbox, &pcipath).await?;
storage.source = dev_path;
}
@@ -583,11 +583,11 @@ async fn virtio_blk_storage_handler(
async fn virtio_blk_ccw_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(&sandbox, &ccw_device).await?;
let dev_path = get_virtio_blk_ccw_device_name(sandbox, &ccw_device).await?;
storage.source = dev_path;
common_storage_handler(logger, &storage)
}
@@ -597,7 +597,7 @@ async fn virtio_blk_ccw_storage_handler(
async fn virtio_blk_ccw_storage_handler(
_: &Logger,
_: &Storage,
_: Arc<Mutex<Sandbox>>,
_: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
Err(anyhow!("CCW is only supported on s390x"))
}
@@ -607,12 +607,12 @@ async fn virtio_blk_ccw_storage_handler(
async fn virtio_scsi_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
// Retrieve the device path from SCSI address.
let dev_path = get_scsi_device_name(&sandbox, &storage.source).await?;
let dev_path = get_scsi_device_name(sandbox, &storage.source).await?;
storage.source = dev_path;
common_storage_handler(logger, &storage)
@@ -633,12 +633,12 @@ fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result<String>
async fn nvdimm_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
// Retrieve the device path from NVDIMM address.
wait_for_pmem_device(&sandbox, &storage.source).await?;
wait_for_pmem_device(sandbox, &storage.source).await?;
common_storage_handler(logger, &storage)
}
@@ -646,7 +646,7 @@ async fn nvdimm_storage_handler(
async fn bind_watcher_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<()> {
let mut locked = sandbox.lock().await;
@@ -862,7 +862,7 @@ fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) {
pub async fn add_storages(
logger: Logger,
storages: Vec<Storage>,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<Vec<String>> {
let mut mount_list = Vec::new();
@@ -882,31 +882,22 @@ pub async fn add_storages(
}
let res = match handler_name.as_str() {
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_BLK_CCW_TYPE => {
virtio_blk_ccw_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_VIRTIOFS_TYPE => {
virtiofs_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_EPHEMERAL_TYPE => {
ephemeral_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox).await,
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, &storage, sandbox).await,
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox).await,
DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, &storage, sandbox).await,
DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, &storage, sandbox).await,
DRIVER_OVERLAYFS_TYPE => {
overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox.clone()).await
overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox).await
}
DRIVER_MMIO_BLK_TYPE => {
virtiommio_blk_storage_handler(&logger, &storage, sandbox.clone()).await
virtiommio_blk_storage_handler(&logger, &storage, sandbox).await
}
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_SCSI_TYPE => {
virtio_scsi_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox).await,
DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, &storage, sandbox).await,
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox).await,
DRIVER_WATCHABLE_BIND_TYPE => {
bind_watcher_storage_handler(&logger, &storage, sandbox.clone(), cid.clone())
.await?;
bind_watcher_storage_handler(&logger, &storage, sandbox, cid.clone()).await?;
// Don't register watch mounts, they're handled separately by the watcher.
Ok(String::new())
}

View File

@@ -150,9 +150,6 @@ impl AgentService {
let mut oci_spec = req.OCI.clone();
let use_sandbox_pidns = req.sandbox_pidns();
let sandbox;
let mut s;
let mut oci = match oci_spec.as_mut() {
Some(spec) => rustjail::grpc_to_oci(spec),
None => {
@@ -184,15 +181,13 @@ impl AgentService {
let m = add_storages(
sl(),
req.storages.to_vec(),
self.sandbox.clone(),
&self.sandbox,
Some(req.container_id.clone()),
)
.await?;
{
sandbox = self.sandbox.clone();
s = sandbox.lock().await;
s.container_mounts.insert(cid.clone(), m);
}
let mut s = self.sandbox.lock().await;
s.container_mounts.insert(cid.clone(), m);
update_container_namespaces(&s, &mut oci, use_sandbox_pidns)?;
@@ -265,8 +260,7 @@ impl AgentService {
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
let cid = req.container_id;
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
let mut s = self.sandbox.lock().await;
let sid = s.id.clone();
let ctr = s
@@ -300,8 +294,7 @@ impl AgentService {
let cid = req.container_id.clone();
if req.timeout == 0 {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
sandbox.bind_watcher.remove_container(&cid).await;
@@ -341,8 +334,7 @@ impl AgentService {
return Err(anyhow!(nix::Error::UnknownErrno));
}
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
remove_container_resources(&mut sandbox, &cid)?;
Ok(())
@@ -355,8 +347,7 @@ impl AgentService {
info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id);
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let mut process = req
.process
@@ -383,7 +374,6 @@ impl AgentService {
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let s = self.sandbox.clone();
info!(
sl(),
@@ -395,7 +385,7 @@ impl AgentService {
let mut sig: libc::c_int = req.signal as libc::c_int;
{
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// For container initProcess, if it hasn't installed handler for "SIGTERM" signal,
// it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal
@@ -468,8 +458,7 @@ impl AgentService {
}
async fn freeze_cgroup(&self, cid: &str, state: FreezerState) -> Result<()> {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox
.get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
@@ -478,8 +467,7 @@ impl AgentService {
}
async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox
.get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
@@ -494,7 +482,6 @@ impl AgentService {
) -> Result<protocols::agent::WaitProcessResponse> {
let cid = req.container_id.clone();
let eid = req.exec_id;
let s = self.sandbox.clone();
let mut resp = WaitProcessResponse::new();
let pid: pid_t;
@@ -508,7 +495,7 @@ impl AgentService {
);
let exit_rx = {
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
p.exit_watchers.push(exit_send);
@@ -523,7 +510,7 @@ impl AgentService {
info!(sl(), "cid {} eid {} received exit signal", &cid, &eid);
}
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox
.get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?;
@@ -565,8 +552,7 @@ impl AgentService {
let eid = req.exec_id.clone();
let writer = {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// use ptmx io
@@ -597,9 +583,7 @@ impl AgentService {
let term_exit_notifier;
let reader = {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
term_exit_notifier = p.term_exit_notifier.clone();
@@ -630,7 +614,7 @@ impl AgentService {
// Poll::Ready so that the term_exit_notifier will never polled
// before all data were read.
biased;
v = read_stream(reader, req.len as usize) => {
v = read_stream(&reader, req.len as usize) => {
let vector = v?;
let mut resp = ReadStreamResponse::new();
resp.set_data(vector);
@@ -734,8 +718,7 @@ impl agent_ttrpc::AgentService for AgentService {
let cid = req.container_id.clone();
let res = req.resources;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error(
@@ -768,8 +751,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "stats_container", req);
is_allowed(&req)?;
let cid = req.container_id;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error(
@@ -790,8 +772,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "pause_container", req);
is_allowed(&req)?;
let cid = req.container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| {
ttrpc_error(
@@ -814,8 +795,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "resume_container", req);
is_allowed(&req)?;
let cid = req.container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| {
ttrpc_error(
@@ -896,8 +876,7 @@ impl agent_ttrpc::AgentService for AgentService {
let cid = req.container_id.clone();
let eid = req.exec_id;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox
.find_container_process(cid.as_str(), eid.as_str())
@@ -923,8 +902,7 @@ impl agent_ttrpc::AgentService for AgentService {
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox
.find_container_process(cid.as_str(), eid.as_str())
.map_err(|e| {
@@ -1028,7 +1006,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_mounts", req);
is_allowed(&req)?;
match update_ephemeral_mounts(sl(), req.storages.to_vec(), self.sandbox.clone()).await {
match update_ephemeral_mounts(sl(), req.storages.to_vec(), &self.sandbox).await {
Ok(_) => Ok(Empty::new()),
Err(e) => Err(ttrpc_error(
ttrpc::Code::INTERNAL,
@@ -1251,8 +1229,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
{
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
let mut s = self.sandbox.lock().await;
let _ = fs::remove_dir_all(CONTAINER_BASE);
let _ = fs::create_dir_all(CONTAINER_BASE);
@@ -1282,19 +1259,16 @@ impl agent_ttrpc::AgentService for AgentService {
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
}
match add_storages(sl(), req.storages.to_vec(), self.sandbox.clone(), None).await {
match add_storages(sl(), req.storages.to_vec(), &self.sandbox, None).await {
Ok(m) => {
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
s.mounts = m
self.sandbox.lock().await.mounts = m;
}
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
match setup_guest_dns(sl(), req.dns.to_vec()) {
Ok(_) => {
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
let mut s = self.sandbox.lock().await;
let _dns = req
.dns
.to_vec()
@@ -1315,8 +1289,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "destroy_sandbox", req);
is_allowed(&req)?;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
// destroy all containers, clean up, notify agent to exit
// etc.
sandbox
@@ -1383,8 +1356,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::OnlineCPUMemRequest,
) -> ttrpc::Result<Empty> {
is_allowed(&req)?;
let s = Arc::clone(&self.sandbox);
let sandbox = s.lock().await;
let sandbox = self.sandbox.lock().await;
trace_rpc_call!(ctx, "online_cpu_mem", req);
sandbox
@@ -1507,12 +1479,10 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::GetOOMEventRequest,
) -> ttrpc::Result<OOMEvent> {
is_allowed(&req)?;
let sandbox = self.sandbox.clone();
let s = sandbox.lock().await;
let s = self.sandbox.lock().await;
let event_rx = &s.event_rx.clone();
let mut event_rx = event_rx.lock().await;
drop(s);
drop(sandbox);
if let Some(container_id) = event_rx.recv().await {
info!(sl(), "get_oom_event return {}", &container_id);
@@ -1709,7 +1679,7 @@ fn get_agent_details() -> AgentDetails {
detail
}
async fn read_stream(reader: Arc<Mutex<ReadHalf<PipeStream>>>, l: usize) -> Result<Vec<u8>> {
async fn read_stream(reader: &Mutex<ReadHalf<PipeStream>>, l: usize) -> Result<Vec<u8>> {
let mut content = vec![0u8; l];
let mut reader = reader.lock().await;