1
0
mirror of https://github.com/kata-containers/kata-containers.git synced 2025-04-29 12:14:48 +00:00

agent: avoid unnecessary calls to Arc::clone

These calls cause two extra atomic instructions each time they're used,
one to increment and another one to decrement the refcount.

Since we don't need them because the referred value is guaranteed to
outlive the function, remove the calls.

Fixes: 

Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
Wedson Almeida Filho 2023-06-28 01:19:09 -03:00
parent 8c03deac3a
commit c36572418f
2 changed files with 61 additions and 100 deletions
src/agent/src

View File

@ -230,7 +230,7 @@ pub fn baremount(
async fn ephemeral_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
// hugetlbfs
if storage.fstype == FS_TYPE_HUGETLB {
@ -278,7 +278,7 @@ async fn ephemeral_storage_handler(
pub async fn update_ephemeral_mounts(
logger: Logger,
storages: Vec<Storage>,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<()> {
for (_, storage) in storages.iter().enumerate() {
let handler_name = storage.driver.clone();
@ -341,7 +341,7 @@ async fn overlayfs_storage_handler(
logger: &Logger,
storage: &Storage,
cid: Option<&str>,
_sandbox: Arc<Mutex<Sandbox>>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
if storage
.options
@ -374,7 +374,7 @@ async fn overlayfs_storage_handler(
async fn local_storage_handler(
_logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
fs::create_dir_all(&storage.mount_point).context(format!(
"failed to create dir all {:?}",
@ -414,7 +414,7 @@ async fn local_storage_handler(
async fn virtio9p_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: Arc<Mutex<Sandbox>>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
@ -528,11 +528,11 @@ fn get_pagesize_and_size_from_option(options: &[String]) -> Result<(u64, u64)> {
async fn virtiommio_blk_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
if !Path::new(&storage.source).exists() {
get_virtio_mmio_device_name(&sandbox, &storage.source)
get_virtio_mmio_device_name(sandbox, &storage.source)
.await
.context("failed to get mmio device name")?;
}
@ -545,7 +545,7 @@ async fn virtiommio_blk_storage_handler(
async fn virtiofs_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: Arc<Mutex<Sandbox>>,
_sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
@ -555,7 +555,7 @@ async fn virtiofs_storage_handler(
async fn virtio_blk_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
// If hot-plugged, get the device node path based on the PCI path
@ -570,7 +570,7 @@ async fn virtio_blk_storage_handler(
}
} else {
let pcipath = pci::Path::from_str(&storage.source)?;
let dev_path = get_virtio_blk_pci_device_name(&sandbox, &pcipath).await?;
let dev_path = get_virtio_blk_pci_device_name(sandbox, &pcipath).await?;
storage.source = dev_path;
}
@ -583,11 +583,11 @@ async fn virtio_blk_storage_handler(
async fn virtio_blk_ccw_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
let ccw_device = ccw::Device::from_str(&storage.source)?;
let dev_path = get_virtio_blk_ccw_device_name(&sandbox, &ccw_device).await?;
let dev_path = get_virtio_blk_ccw_device_name(sandbox, &ccw_device).await?;
storage.source = dev_path;
common_storage_handler(logger, &storage)
}
@ -597,7 +597,7 @@ async fn virtio_blk_ccw_storage_handler(
async fn virtio_blk_ccw_storage_handler(
_: &Logger,
_: &Storage,
_: Arc<Mutex<Sandbox>>,
_: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
Err(anyhow!("CCW is only supported on s390x"))
}
@ -607,12 +607,12 @@ async fn virtio_blk_ccw_storage_handler(
async fn virtio_scsi_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let mut storage = storage.clone();
// Retrieve the device path from SCSI address.
let dev_path = get_scsi_device_name(&sandbox, &storage.source).await?;
let dev_path = get_scsi_device_name(sandbox, &storage.source).await?;
storage.source = dev_path;
common_storage_handler(logger, &storage)
@ -633,12 +633,12 @@ fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result<String>
async fn nvdimm_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
) -> Result<String> {
let storage = storage.clone();
// Retrieve the device path from NVDIMM address.
wait_for_pmem_device(&sandbox, &storage.source).await?;
wait_for_pmem_device(sandbox, &storage.source).await?;
common_storage_handler(logger, &storage)
}
@ -646,7 +646,7 @@ async fn nvdimm_storage_handler(
async fn bind_watcher_storage_handler(
logger: &Logger,
storage: &Storage,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<()> {
let mut locked = sandbox.lock().await;
@ -862,7 +862,7 @@ fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) {
pub async fn add_storages(
logger: Logger,
storages: Vec<Storage>,
sandbox: Arc<Mutex<Sandbox>>,
sandbox: &Arc<Mutex<Sandbox>>,
cid: Option<String>,
) -> Result<Vec<String>> {
let mut mount_list = Vec::new();
@ -882,31 +882,22 @@ pub async fn add_storages(
}
let res = match handler_name.as_str() {
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_BLK_CCW_TYPE => {
virtio_blk_ccw_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_VIRTIOFS_TYPE => {
virtiofs_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_EPHEMERAL_TYPE => {
ephemeral_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox).await,
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, &storage, sandbox).await,
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox).await,
DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, &storage, sandbox).await,
DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, &storage, sandbox).await,
DRIVER_OVERLAYFS_TYPE => {
overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox.clone()).await
overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox).await
}
DRIVER_MMIO_BLK_TYPE => {
virtiommio_blk_storage_handler(&logger, &storage, sandbox.clone()).await
virtiommio_blk_storage_handler(&logger, &storage, sandbox).await
}
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_SCSI_TYPE => {
virtio_scsi_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox.clone()).await,
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox).await,
DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, &storage, sandbox).await,
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox).await,
DRIVER_WATCHABLE_BIND_TYPE => {
bind_watcher_storage_handler(&logger, &storage, sandbox.clone(), cid.clone())
.await?;
bind_watcher_storage_handler(&logger, &storage, sandbox, cid.clone()).await?;
// Don't register watch mounts, they're handled separately by the watcher.
Ok(String::new())
}

View File

@ -150,9 +150,6 @@ impl AgentService {
let mut oci_spec = req.OCI.clone();
let use_sandbox_pidns = req.sandbox_pidns();
let sandbox;
let mut s;
let mut oci = match oci_spec.as_mut() {
Some(spec) => rustjail::grpc_to_oci(spec),
None => {
@ -184,15 +181,13 @@ impl AgentService {
let m = add_storages(
sl(),
req.storages.to_vec(),
self.sandbox.clone(),
&self.sandbox,
Some(req.container_id.clone()),
)
.await?;
{
sandbox = self.sandbox.clone();
s = sandbox.lock().await;
s.container_mounts.insert(cid.clone(), m);
}
let mut s = self.sandbox.lock().await;
s.container_mounts.insert(cid.clone(), m);
update_container_namespaces(&s, &mut oci, use_sandbox_pidns)?;
@ -265,8 +260,7 @@ impl AgentService {
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
let cid = req.container_id;
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
let mut s = self.sandbox.lock().await;
let sid = s.id.clone();
let ctr = s
@ -300,8 +294,7 @@ impl AgentService {
let cid = req.container_id.clone();
if req.timeout == 0 {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
sandbox.bind_watcher.remove_container(&cid).await;
@ -341,8 +334,7 @@ impl AgentService {
return Err(anyhow!(nix::Error::UnknownErrno));
}
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
remove_container_resources(&mut sandbox, &cid)?;
Ok(())
@ -355,8 +347,7 @@ impl AgentService {
info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id);
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let mut process = req
.process
@ -383,7 +374,6 @@ impl AgentService {
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let s = self.sandbox.clone();
info!(
sl(),
@ -395,7 +385,7 @@ impl AgentService {
let mut sig: libc::c_int = req.signal as libc::c_int;
{
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// For container initProcess, if it hasn't installed handler for "SIGTERM" signal,
// it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal
@ -468,8 +458,7 @@ impl AgentService {
}
async fn freeze_cgroup(&self, cid: &str, state: FreezerState) -> Result<()> {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox
.get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
@ -478,8 +467,7 @@ impl AgentService {
}
async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox
.get_container(cid)
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
@ -494,7 +482,6 @@ impl AgentService {
) -> Result<protocols::agent::WaitProcessResponse> {
let cid = req.container_id.clone();
let eid = req.exec_id;
let s = self.sandbox.clone();
let mut resp = WaitProcessResponse::new();
let pid: pid_t;
@ -508,7 +495,7 @@ impl AgentService {
);
let exit_rx = {
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
p.exit_watchers.push(exit_send);
@ -523,7 +510,7 @@ impl AgentService {
info!(sl(), "cid {} eid {} received exit signal", &cid, &eid);
}
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox
.get_container(&cid)
.ok_or_else(|| anyhow!("Invalid container id"))?;
@ -565,8 +552,7 @@ impl AgentService {
let eid = req.exec_id.clone();
let writer = {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// use ptmx io
@ -597,9 +583,7 @@ impl AgentService {
let term_exit_notifier;
let reader = {
let s = self.sandbox.clone();
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
term_exit_notifier = p.term_exit_notifier.clone();
@ -630,7 +614,7 @@ impl AgentService {
// Poll::Ready so that the term_exit_notifier will never polled
// before all data were read.
biased;
v = read_stream(reader, req.len as usize) => {
v = read_stream(&reader, req.len as usize) => {
let vector = v?;
let mut resp = ReadStreamResponse::new();
resp.set_data(vector);
@ -734,8 +718,7 @@ impl agent_ttrpc::AgentService for AgentService {
let cid = req.container_id.clone();
let res = req.resources;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error(
@ -768,8 +751,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "stats_container", req);
is_allowed(&req)?;
let cid = req.container_id;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
ttrpc_error(
@ -790,8 +772,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "pause_container", req);
is_allowed(&req)?;
let cid = req.container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| {
ttrpc_error(
@ -814,8 +795,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "resume_container", req);
is_allowed(&req)?;
let cid = req.container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let ctr = sandbox.get_container(cid).ok_or_else(|| {
ttrpc_error(
@ -896,8 +876,7 @@ impl agent_ttrpc::AgentService for AgentService {
let cid = req.container_id.clone();
let eid = req.exec_id;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox
.find_container_process(cid.as_str(), eid.as_str())
@ -923,8 +902,7 @@ impl agent_ttrpc::AgentService for AgentService {
let cid = req.container_id.clone();
let eid = req.exec_id.clone();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
let p = sandbox
.find_container_process(cid.as_str(), eid.as_str())
.map_err(|e| {
@ -1028,7 +1006,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "update_mounts", req);
is_allowed(&req)?;
match update_ephemeral_mounts(sl(), req.storages.to_vec(), self.sandbox.clone()).await {
match update_ephemeral_mounts(sl(), req.storages.to_vec(), &self.sandbox).await {
Ok(_) => Ok(Empty::new()),
Err(e) => Err(ttrpc_error(
ttrpc::Code::INTERNAL,
@ -1251,8 +1229,7 @@ impl agent_ttrpc::AgentService for AgentService {
is_allowed(&req)?;
{
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
let mut s = self.sandbox.lock().await;
let _ = fs::remove_dir_all(CONTAINER_BASE);
let _ = fs::create_dir_all(CONTAINER_BASE);
@ -1282,19 +1259,16 @@ impl agent_ttrpc::AgentService for AgentService {
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
}
match add_storages(sl(), req.storages.to_vec(), self.sandbox.clone(), None).await {
match add_storages(sl(), req.storages.to_vec(), &self.sandbox, None).await {
Ok(m) => {
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
s.mounts = m
self.sandbox.lock().await.mounts = m;
}
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
};
match setup_guest_dns(sl(), req.dns.to_vec()) {
Ok(_) => {
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().await;
let mut s = self.sandbox.lock().await;
let _dns = req
.dns
.to_vec()
@ -1315,8 +1289,7 @@ impl agent_ttrpc::AgentService for AgentService {
trace_rpc_call!(ctx, "destroy_sandbox", req);
is_allowed(&req)?;
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().await;
let mut sandbox = self.sandbox.lock().await;
// destroy all containers, clean up, notify agent to exit
// etc.
sandbox
@ -1383,8 +1356,7 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::OnlineCPUMemRequest,
) -> ttrpc::Result<Empty> {
is_allowed(&req)?;
let s = Arc::clone(&self.sandbox);
let sandbox = s.lock().await;
let sandbox = self.sandbox.lock().await;
trace_rpc_call!(ctx, "online_cpu_mem", req);
sandbox
@ -1507,12 +1479,10 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::GetOOMEventRequest,
) -> ttrpc::Result<OOMEvent> {
is_allowed(&req)?;
let sandbox = self.sandbox.clone();
let s = sandbox.lock().await;
let s = self.sandbox.lock().await;
let event_rx = &s.event_rx.clone();
let mut event_rx = event_rx.lock().await;
drop(s);
drop(sandbox);
if let Some(container_id) = event_rx.recv().await {
info!(sl(), "get_oom_event return {}", &container_id);
@ -1709,7 +1679,7 @@ fn get_agent_details() -> AgentDetails {
detail
}
async fn read_stream(reader: Arc<Mutex<ReadHalf<PipeStream>>>, l: usize) -> Result<Vec<u8>> {
async fn read_stream(reader: &Mutex<ReadHalf<PipeStream>>, l: usize) -> Result<Vec<u8>> {
let mut content = vec![0u8; l];
let mut reader = reader.lock().await;