From d3c54223793cb023578b48b3037ec3850d6177d4 Mon Sep 17 00:00:00 2001 From: Jiang Liu <gerry@linux.alibaba.com> Date: Sat, 5 Aug 2023 23:15:53 +0800 Subject: [PATCH] agent: refine style of code related to sandbox Refine style of code related to sandbox by: - remove unnecessary comments for caller to take lock, we have already taken `&mut self`. - change "*count < 1 " to "*count == 0", `count` is type of u32. - make remove_sandbox_storage() to take `&mut self` instead of `&self`. - group related function to each others - avoid search the map twice in function find_process() - avoid unwrap() in function run_oom_event_monitor() - avoid unwrap() in online_resources() Signed-off-by: Jiang Liu <gerry@linux.alibaba.com> --- src/agent/src/sandbox.rs | 172 ++++++++++++++++----------------------- 1 file changed, 72 insertions(+), 100 deletions(-) diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 1202bd793f..11a136c831 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -3,14 +3,14 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::linux_abi::*; -use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; -use crate::namespace::Namespace; -use crate::netlink::Handle; -use crate::network::Network; -use crate::pci; -use crate::uevent::{Uevent, UeventMatcher}; -use crate::watcher::BindWatcher; +use std::collections::HashMap; +use std::fs; +use std::os::unix::fs::PermissionsExt; +use std::path::Path; +use std::str::FromStr; +use std::sync::Arc; +use std::{thread, time}; + use anyhow::{anyhow, Context, Result}; use kata_types::cpu::CpuSet; use libc::pid_t; @@ -22,18 +22,20 @@ use rustjail::container::BaseContainer; use rustjail::container::LinuxContainer; use rustjail::process::Process; use slog::Logger; -use std::collections::HashMap; -use std::fs; -use std::os::unix::fs::PermissionsExt; -use std::path::Path; -use std::str::FromStr; -use std::sync::Arc; -use std::{thread, time}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot; use tokio::sync::Mutex; use tracing::instrument; +use crate::linux_abi::*; +use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; +use crate::namespace::Namespace; +use crate::netlink::Handle; +use crate::network::Network; +use crate::pci; +use crate::uevent::{Uevent, UeventMatcher}; +use crate::watcher::BindWatcher; + pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id"; type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>); @@ -103,9 +105,6 @@ impl Sandbox { // This method also returns a boolean to let // callers know if the storage already existed or not. // It will return true if storage is new. - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] pub fn set_sandbox_storage(&mut self, path: &str) -> bool { match self.storages.get_mut(path) { @@ -126,16 +125,13 @@ impl Sandbox { // storage reference from the sandbox and return 'true' to // let the caller know that they can clean up the storage // related directories by calling remove_sandbox_storage - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> { match self.storages.get_mut(path) { None => Err(anyhow!("Sandbox storage with path {} not found", path)), Some(count) => { *count -= 1; - if *count < 1 { + if *count == 0 { self.storages.remove(path); return Ok(true); } @@ -146,11 +142,8 @@ impl Sandbox { // remove_sandbox_storage removes the sandbox storage if no // containers are using that storage. - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] - pub fn remove_sandbox_storage(&self, path: &str) -> Result<()> { + pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> { let mounts = vec![path.to_string()]; remove_mounts(&mounts)?; // "remove_dir" will fail if the mount point is backed by a read-only filesystem. @@ -165,9 +158,6 @@ impl Sandbox { // unset_and_remove_sandbox_storage unsets the storage from sandbox // and if there are no containers using this storage it will // remove it from the sandbox. - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> { if self.unset_sandbox_storage(path)? { @@ -184,22 +174,18 @@ impl Sandbox { .get_ipc() .setup() .await - .context("Failed to setup persistent IPC namespace")?; + .context("setup persistent IPC namespace")?; // // Set up shared UTS namespace self.shared_utsns = Namespace::new(&self.logger) .get_uts(self.hostname.as_str()) .setup() .await - .context("Failed to setup persistent UTS namespace")?; + .context("setup persistent UTS namespace")?; Ok(true) } - pub fn add_container(&mut self, c: LinuxContainer) { - self.containers.insert(c.id.clone(), c); - } - #[instrument] pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> { // Populate the shared pid path only if this is an infra container and @@ -224,14 +210,18 @@ impl Sandbox { Ok(()) } + pub fn add_container(&mut self, c: LinuxContainer) { + self.containers.insert(c.id.clone(), c); + } + pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> { self.containers.get_mut(id) } pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> { for (_, c) in self.containers.iter_mut() { - if c.processes.get(&pid).is_some() { - return c.processes.get_mut(&pid); + if let Some(p) = c.processes.get_mut(&pid) { + return Some(p); } } @@ -280,24 +270,19 @@ impl Sandbox { let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?; for (_, ctr) in self.containers.iter() { - let cpu = ctr - .config - .spec - .as_ref() - .unwrap() - .linux - .as_ref() - .unwrap() - .resources - .as_ref() - .unwrap() - .cpu - .as_ref(); - let container_cpust = if let Some(c) = cpu { &c.cpus } else { "" }; + let mut container_cpust = ""; + if let Some(spec) = ctr.config.spec.as_ref() { + if let Some(linux) = spec.linux.as_ref() { + if let Some(resources) = linux.resources.as_ref() { + if let Some(cpus) = resources.cpu.as_ref() { + container_cpust = &cpus.cpus; + } + } + } + } info!(self.logger, "updating {}", ctr.id.as_str()); ctr.cgroup_manager - .as_ref() .update_cpuset_path(guest_cpuset.as_str(), container_cpust)?; } @@ -360,31 +345,28 @@ impl Sandbox { #[instrument] pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) { let logger = self.logger.clone(); - - if self.event_tx.is_none() { - error!( - logger, - "sandbox.event_tx not found in run_oom_event_monitor" - ); - return; - } - - let tx = self.event_tx.as_ref().unwrap().clone(); + let tx = match self.event_tx.as_ref() { + Some(v) => v.clone(), + None => { + error!( + logger, + "sandbox.event_tx not found in run_oom_event_monitor" + ); + return; + } + }; tokio::spawn(async move { loop { let event = rx.recv().await; - // None means the container has exited, - // and sender in OOM notifier is dropped. + // None means the container has exited, and sender in OOM notifier is dropped. if event.is_none() { return; } info!(logger, "got an OOM event {:?}", event); - - let _ = tx - .send(container_id.clone()) - .await - .map_err(|e| error!(logger, "failed to send message: {:?}", e)); + if let Err(e) = tx.send(container_id.clone()).await { + error!(logger, "failed to send message: {:?}", e); + } } }); } @@ -397,39 +379,36 @@ fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Res for e in fs::read_dir(path)? { let entry = e?; - let tmpname = entry.file_name(); - let name = tmpname.to_str().unwrap(); - let p = entry.path(); - - if re.is_match(name) { - let file = format!("{}/{}", p.to_str().unwrap(), SYSFS_ONLINE_FILE); - info!(logger, "{}", file.as_str()); - - let c = fs::read_to_string(file.as_str()); - if c.is_err() { - continue; - } - let c = c.unwrap(); - - if c.trim().contains('0') { - let r = fs::write(file.as_str(), "1"); - if r.is_err() { + // Skip direntry which doesn't match the pattern. + match entry.file_name().to_str() { + None => continue, + Some(v) => { + if !re.is_match(v) { continue; } - count += 1; + } + }; - if num > 0 && count == num { + let p = entry.path().join(SYSFS_ONLINE_FILE); + if let Ok(c) = fs::read_to_string(&p) { + // Try to online the object in offline state. + if c.trim().contains('0') && fs::write(&p, "1").is_ok() && num > 0 { + count += 1; + if count == num { break; } } } } - if num > 0 { - return Ok(count); - } + Ok(count) +} - Ok(0) +#[instrument] +fn online_memory(logger: &Logger) -> Result<()> { + online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1) + .context("online memory resource")?; + Ok(()) } // max wait for all CPUs to online will use 50 * 100 = 5 seconds. @@ -473,13 +452,6 @@ fn online_cpus(logger: &Logger, num: i32) -> Result<i32> { )) } -#[instrument] -fn online_memory(logger: &Logger) -> Result<()> { - online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1) - .context("online memory resource")?; - Ok(()) -} - fn onlined_cpus() -> Result<i32> { let content = fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?; @@ -555,7 +527,7 @@ mod tests { skip_if_not_root!(); let logger = slog::Logger::root(slog::Discard, o!()); - let s = Sandbox::new(&logger).unwrap(); + let mut s = Sandbox::new(&logger).unwrap(); let tmpdir = Builder::new().tempdir().unwrap(); let tmpdir_path = tmpdir.path().to_str().unwrap();