diff --git a/src/agent/src/mount.rs b/src/agent/src/mount.rs index 83f624f98c..84227b1821 100644 --- a/src/agent/src/mount.rs +++ b/src/agent/src/mount.rs @@ -230,7 +230,7 @@ pub fn baremount( async fn ephemeral_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { // hugetlbfs if storage.fstype == FS_TYPE_HUGETLB { @@ -278,7 +278,7 @@ async fn ephemeral_storage_handler( pub async fn update_ephemeral_mounts( logger: Logger, storages: Vec, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result<()> { for (_, storage) in storages.iter().enumerate() { let handler_name = storage.driver.clone(); @@ -341,7 +341,7 @@ async fn overlayfs_storage_handler( logger: &Logger, storage: &Storage, cid: Option<&str>, - _sandbox: Arc>, + _sandbox: &Arc>, ) -> Result { if storage .options @@ -374,7 +374,7 @@ async fn overlayfs_storage_handler( async fn local_storage_handler( _logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { fs::create_dir_all(&storage.mount_point).context(format!( "failed to create dir all {:?}", @@ -414,7 +414,7 @@ async fn local_storage_handler( async fn virtio9p_storage_handler( logger: &Logger, storage: &Storage, - _sandbox: Arc>, + _sandbox: &Arc>, ) -> Result { common_storage_handler(logger, storage) } @@ -528,11 +528,11 @@ fn get_pagesize_and_size_from_option(options: &[String]) -> Result<(u64, u64)> { async fn virtiommio_blk_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { let storage = storage.clone(); if !Path::new(&storage.source).exists() { - get_virtio_mmio_device_name(&sandbox, &storage.source) + get_virtio_mmio_device_name(sandbox, &storage.source) .await .context("failed to get mmio device name")?; } @@ -545,7 +545,7 @@ async fn virtiommio_blk_storage_handler( async fn virtiofs_storage_handler( logger: &Logger, storage: &Storage, - _sandbox: Arc>, + _sandbox: &Arc>, ) -> Result { common_storage_handler(logger, storage) } @@ -555,7 +555,7 @@ async fn virtiofs_storage_handler( async fn virtio_blk_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { let mut storage = storage.clone(); // If hot-plugged, get the device node path based on the PCI path @@ -570,7 +570,7 @@ async fn virtio_blk_storage_handler( } } else { let pcipath = pci::Path::from_str(&storage.source)?; - let dev_path = get_virtio_blk_pci_device_name(&sandbox, &pcipath).await?; + let dev_path = get_virtio_blk_pci_device_name(sandbox, &pcipath).await?; storage.source = dev_path; } @@ -583,11 +583,11 @@ async fn virtio_blk_storage_handler( async fn virtio_blk_ccw_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { let mut storage = storage.clone(); let ccw_device = ccw::Device::from_str(&storage.source)?; - let dev_path = get_virtio_blk_ccw_device_name(&sandbox, &ccw_device).await?; + let dev_path = get_virtio_blk_ccw_device_name(sandbox, &ccw_device).await?; storage.source = dev_path; common_storage_handler(logger, &storage) } @@ -597,7 +597,7 @@ async fn virtio_blk_ccw_storage_handler( async fn virtio_blk_ccw_storage_handler( _: &Logger, _: &Storage, - _: Arc>, + _: &Arc>, ) -> Result { Err(anyhow!("CCW is only supported on s390x")) } @@ -607,12 +607,12 @@ async fn virtio_blk_ccw_storage_handler( async fn virtio_scsi_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { let mut storage = storage.clone(); // Retrieve the device path from SCSI address. - let dev_path = get_scsi_device_name(&sandbox, &storage.source).await?; + let dev_path = get_scsi_device_name(sandbox, &storage.source).await?; storage.source = dev_path; common_storage_handler(logger, &storage) @@ -633,12 +633,12 @@ fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result async fn nvdimm_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, ) -> Result { let storage = storage.clone(); // Retrieve the device path from NVDIMM address. - wait_for_pmem_device(&sandbox, &storage.source).await?; + wait_for_pmem_device(sandbox, &storage.source).await?; common_storage_handler(logger, &storage) } @@ -646,7 +646,7 @@ async fn nvdimm_storage_handler( async fn bind_watcher_storage_handler( logger: &Logger, storage: &Storage, - sandbox: Arc>, + sandbox: &Arc>, cid: Option, ) -> Result<()> { let mut locked = sandbox.lock().await; @@ -862,7 +862,7 @@ fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) { pub async fn add_storages( logger: Logger, storages: Vec, - sandbox: Arc>, + sandbox: &Arc>, cid: Option, ) -> Result> { let mut mount_list = Vec::new(); @@ -882,31 +882,22 @@ pub async fn add_storages( } let res = match handler_name.as_str() { - DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox.clone()).await, - DRIVER_BLK_CCW_TYPE => { - virtio_blk_ccw_storage_handler(&logger, &storage, sandbox.clone()).await - } - DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox.clone()).await, - DRIVER_VIRTIOFS_TYPE => { - virtiofs_storage_handler(&logger, &storage, sandbox.clone()).await - } - DRIVER_EPHEMERAL_TYPE => { - ephemeral_storage_handler(&logger, &storage, sandbox.clone()).await - } + DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox).await, + DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, &storage, sandbox).await, + DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox).await, + DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, &storage, sandbox).await, + DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, &storage, sandbox).await, DRIVER_OVERLAYFS_TYPE => { - overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox.clone()).await + overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox).await } DRIVER_MMIO_BLK_TYPE => { - virtiommio_blk_storage_handler(&logger, &storage, sandbox.clone()).await + virtiommio_blk_storage_handler(&logger, &storage, sandbox).await } - DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox.clone()).await, - DRIVER_SCSI_TYPE => { - virtio_scsi_storage_handler(&logger, &storage, sandbox.clone()).await - } - DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox.clone()).await, + DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox).await, + DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, &storage, sandbox).await, + DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox).await, DRIVER_WATCHABLE_BIND_TYPE => { - bind_watcher_storage_handler(&logger, &storage, sandbox.clone(), cid.clone()) - .await?; + bind_watcher_storage_handler(&logger, &storage, sandbox, cid.clone()).await?; // Don't register watch mounts, they're handled separately by the watcher. Ok(String::new()) } diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 3b7f0a4720..7b92f245e8 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -150,9 +150,6 @@ impl AgentService { let mut oci_spec = req.OCI.clone(); let use_sandbox_pidns = req.sandbox_pidns(); - let sandbox; - let mut s; - let mut oci = match oci_spec.as_mut() { Some(spec) => rustjail::grpc_to_oci(spec), None => { @@ -184,15 +181,13 @@ impl AgentService { let m = add_storages( sl(), req.storages.to_vec(), - self.sandbox.clone(), + &self.sandbox, Some(req.container_id.clone()), ) .await?; - { - sandbox = self.sandbox.clone(); - s = sandbox.lock().await; - s.container_mounts.insert(cid.clone(), m); - } + + let mut s = self.sandbox.lock().await; + s.container_mounts.insert(cid.clone(), m); update_container_namespaces(&s, &mut oci, use_sandbox_pidns)?; @@ -265,8 +260,7 @@ impl AgentService { async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> { let cid = req.container_id; - let sandbox = self.sandbox.clone(); - let mut s = sandbox.lock().await; + let mut s = self.sandbox.lock().await; let sid = s.id.clone(); let ctr = s @@ -300,8 +294,7 @@ impl AgentService { let cid = req.container_id.clone(); if req.timeout == 0 { - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; sandbox.bind_watcher.remove_container(&cid).await; @@ -341,8 +334,7 @@ impl AgentService { return Err(anyhow!(nix::Error::UnknownErrno)); } - let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; remove_container_resources(&mut sandbox, &cid)?; Ok(()) @@ -355,8 +347,7 @@ impl AgentService { info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id); - let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let mut process = req .process @@ -383,7 +374,6 @@ impl AgentService { async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> { let cid = req.container_id.clone(); let eid = req.exec_id.clone(); - let s = self.sandbox.clone(); info!( sl(), @@ -395,7 +385,7 @@ impl AgentService { let mut sig: libc::c_int = req.signal as libc::c_int; { - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; // For container initProcess, if it hasn't installed handler for "SIGTERM" signal, // it will ignore the "SIGTERM" signal sent to it, thus send it "SIGKILL" signal @@ -468,8 +458,7 @@ impl AgentService { } async fn freeze_cgroup(&self, cid: &str, state: FreezerState) -> Result<()> { - let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox .get_container(cid) .ok_or_else(|| anyhow!("Invalid container id {}", cid))?; @@ -478,8 +467,7 @@ impl AgentService { } async fn get_pids(&self, cid: &str) -> Result> { - let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox .get_container(cid) .ok_or_else(|| anyhow!("Invalid container id {}", cid))?; @@ -494,7 +482,6 @@ impl AgentService { ) -> Result { let cid = req.container_id.clone(); let eid = req.exec_id; - let s = self.sandbox.clone(); let mut resp = WaitProcessResponse::new(); let pid: pid_t; @@ -508,7 +495,7 @@ impl AgentService { ); let exit_rx = { - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; p.exit_watchers.push(exit_send); @@ -523,7 +510,7 @@ impl AgentService { info!(sl(), "cid {} eid {} received exit signal", &cid, &eid); } - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox .get_container(&cid) .ok_or_else(|| anyhow!("Invalid container id"))?; @@ -565,8 +552,7 @@ impl AgentService { let eid = req.exec_id.clone(); let writer = { - let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; // use ptmx io @@ -597,9 +583,7 @@ impl AgentService { let term_exit_notifier; let reader = { - let s = self.sandbox.clone(); - let mut sandbox = s.lock().await; - + let mut sandbox = self.sandbox.lock().await; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; term_exit_notifier = p.term_exit_notifier.clone(); @@ -630,7 +614,7 @@ impl AgentService { // Poll::Ready so that the term_exit_notifier will never polled // before all data were read. biased; - v = read_stream(reader, req.len as usize) => { + v = read_stream(&reader, req.len as usize) => { let vector = v?; let mut resp = ReadStreamResponse::new(); resp.set_data(vector); @@ -734,8 +718,7 @@ impl agent_ttrpc::AgentService for AgentService { let cid = req.container_id.clone(); let res = req.resources; - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { ttrpc_error( @@ -768,8 +751,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "stats_container", req); is_allowed(&req)?; let cid = req.container_id; - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { ttrpc_error( @@ -790,8 +772,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "pause_container", req); is_allowed(&req)?; let cid = req.container_id(); - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { ttrpc_error( @@ -814,8 +795,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "resume_container", req); is_allowed(&req)?; let cid = req.container_id(); - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { ttrpc_error( @@ -896,8 +876,7 @@ impl agent_ttrpc::AgentService for AgentService { let cid = req.container_id.clone(); let eid = req.exec_id; - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) @@ -923,8 +902,7 @@ impl agent_ttrpc::AgentService for AgentService { let cid = req.container_id.clone(); let eid = req.exec_id.clone(); - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { @@ -1028,7 +1006,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "update_mounts", req); is_allowed(&req)?; - match update_ephemeral_mounts(sl(), req.storages.to_vec(), self.sandbox.clone()).await { + match update_ephemeral_mounts(sl(), req.storages.to_vec(), &self.sandbox).await { Ok(_) => Ok(Empty::new()), Err(e) => Err(ttrpc_error( ttrpc::Code::INTERNAL, @@ -1251,8 +1229,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; { - let sandbox = self.sandbox.clone(); - let mut s = sandbox.lock().await; + let mut s = self.sandbox.lock().await; let _ = fs::remove_dir_all(CONTAINER_BASE); let _ = fs::create_dir_all(CONTAINER_BASE); @@ -1282,19 +1259,16 @@ impl agent_ttrpc::AgentService for AgentService { .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } - match add_storages(sl(), req.storages.to_vec(), self.sandbox.clone(), None).await { + match add_storages(sl(), req.storages.to_vec(), &self.sandbox, None).await { Ok(m) => { - let sandbox = self.sandbox.clone(); - let mut s = sandbox.lock().await; - s.mounts = m + self.sandbox.lock().await.mounts = m; } Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }; match setup_guest_dns(sl(), req.dns.to_vec()) { Ok(_) => { - let sandbox = self.sandbox.clone(); - let mut s = sandbox.lock().await; + let mut s = self.sandbox.lock().await; let _dns = req .dns .to_vec() @@ -1315,8 +1289,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "destroy_sandbox", req); is_allowed(&req)?; - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + let mut sandbox = self.sandbox.lock().await; // destroy all containers, clean up, notify agent to exit // etc. sandbox @@ -1383,8 +1356,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { is_allowed(&req)?; - let s = Arc::clone(&self.sandbox); - let sandbox = s.lock().await; + let sandbox = self.sandbox.lock().await; trace_rpc_call!(ctx, "online_cpu_mem", req); sandbox @@ -1507,12 +1479,10 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::GetOOMEventRequest, ) -> ttrpc::Result { is_allowed(&req)?; - let sandbox = self.sandbox.clone(); - let s = sandbox.lock().await; + let s = self.sandbox.lock().await; let event_rx = &s.event_rx.clone(); let mut event_rx = event_rx.lock().await; drop(s); - drop(sandbox); if let Some(container_id) = event_rx.recv().await { info!(sl(), "get_oom_event return {}", &container_id); @@ -1709,7 +1679,7 @@ fn get_agent_details() -> AgentDetails { detail } -async fn read_stream(reader: Arc>>, l: usize) -> Result> { +async fn read_stream(reader: &Mutex>, l: usize) -> Result> { let mut content = vec![0u8; l]; let mut reader = reader.lock().await;