agent: refine style of code related to sandbox

Refine style of code related to sandbox by:
- remove unnecessary comments for caller to take lock, we have already taken
  `&mut self`.
- change "*count < 1 " to "*count == 0", `count` is type of u32.
- make remove_sandbox_storage() to take `&mut self` instead of `&self`.
- group related function to each others
- avoid search the map twice in function find_process()
- avoid unwrap() in function run_oom_event_monitor()
- avoid unwrap() in online_resources()

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-05 23:15:53 +08:00
parent 71a9f67781
commit d3c5422379

View File

@ -3,14 +3,14 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use crate::linux_abi::*; use std::collections::HashMap;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; use std::fs;
use crate::namespace::Namespace; use std::os::unix::fs::PermissionsExt;
use crate::netlink::Handle; use std::path::Path;
use crate::network::Network; use std::str::FromStr;
use crate::pci; use std::sync::Arc;
use crate::uevent::{Uevent, UeventMatcher}; use std::{thread, time};
use crate::watcher::BindWatcher;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use kata_types::cpu::CpuSet; use kata_types::cpu::CpuSet;
use libc::pid_t; use libc::pid_t;
@ -22,18 +22,20 @@ use rustjail::container::BaseContainer;
use rustjail::container::LinuxContainer; use rustjail::container::LinuxContainer;
use rustjail::process::Process; use rustjail::process::Process;
use slog::Logger; use slog::Logger;
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::{thread, time};
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::instrument; use tracing::instrument;
use crate::linux_abi::*;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
use crate::namespace::Namespace;
use crate::netlink::Handle;
use crate::network::Network;
use crate::pci;
use crate::uevent::{Uevent, UeventMatcher};
use crate::watcher::BindWatcher;
pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id"; pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>); type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
@ -103,9 +105,6 @@ impl Sandbox {
// This method also returns a boolean to let // This method also returns a boolean to let
// callers know if the storage already existed or not. // callers know if the storage already existed or not.
// It will return true if storage is new. // It will return true if storage is new.
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument] #[instrument]
pub fn set_sandbox_storage(&mut self, path: &str) -> bool { pub fn set_sandbox_storage(&mut self, path: &str) -> bool {
match self.storages.get_mut(path) { match self.storages.get_mut(path) {
@ -126,16 +125,13 @@ impl Sandbox {
// storage reference from the sandbox and return 'true' to // storage reference from the sandbox and return 'true' to
// let the caller know that they can clean up the storage // let the caller know that they can clean up the storage
// related directories by calling remove_sandbox_storage // related directories by calling remove_sandbox_storage
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument] #[instrument]
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> { pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> {
match self.storages.get_mut(path) { match self.storages.get_mut(path) {
None => Err(anyhow!("Sandbox storage with path {} not found", path)), None => Err(anyhow!("Sandbox storage with path {} not found", path)),
Some(count) => { Some(count) => {
*count -= 1; *count -= 1;
if *count < 1 { if *count == 0 {
self.storages.remove(path); self.storages.remove(path);
return Ok(true); return Ok(true);
} }
@ -146,11 +142,8 @@ impl Sandbox {
// remove_sandbox_storage removes the sandbox storage if no // remove_sandbox_storage removes the sandbox storage if no
// containers are using that storage. // containers are using that storage.
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument] #[instrument]
pub fn remove_sandbox_storage(&self, path: &str) -> Result<()> { pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
let mounts = vec![path.to_string()]; let mounts = vec![path.to_string()];
remove_mounts(&mounts)?; remove_mounts(&mounts)?;
// "remove_dir" will fail if the mount point is backed by a read-only filesystem. // "remove_dir" will fail if the mount point is backed by a read-only filesystem.
@ -165,9 +158,6 @@ impl Sandbox {
// unset_and_remove_sandbox_storage unsets the storage from sandbox // unset_and_remove_sandbox_storage unsets the storage from sandbox
// and if there are no containers using this storage it will // and if there are no containers using this storage it will
// remove it from the sandbox. // remove it from the sandbox.
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument] #[instrument]
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> { pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
if self.unset_sandbox_storage(path)? { if self.unset_sandbox_storage(path)? {
@ -184,22 +174,18 @@ impl Sandbox {
.get_ipc() .get_ipc()
.setup() .setup()
.await .await
.context("Failed to setup persistent IPC namespace")?; .context("setup persistent IPC namespace")?;
// // Set up shared UTS namespace // // Set up shared UTS namespace
self.shared_utsns = Namespace::new(&self.logger) self.shared_utsns = Namespace::new(&self.logger)
.get_uts(self.hostname.as_str()) .get_uts(self.hostname.as_str())
.setup() .setup()
.await .await
.context("Failed to setup persistent UTS namespace")?; .context("setup persistent UTS namespace")?;
Ok(true) Ok(true)
} }
pub fn add_container(&mut self, c: LinuxContainer) {
self.containers.insert(c.id.clone(), c);
}
#[instrument] #[instrument]
pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> { pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> {
// Populate the shared pid path only if this is an infra container and // Populate the shared pid path only if this is an infra container and
@ -224,14 +210,18 @@ impl Sandbox {
Ok(()) Ok(())
} }
pub fn add_container(&mut self, c: LinuxContainer) {
self.containers.insert(c.id.clone(), c);
}
pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> { pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> {
self.containers.get_mut(id) self.containers.get_mut(id)
} }
pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> { pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> {
for (_, c) in self.containers.iter_mut() { for (_, c) in self.containers.iter_mut() {
if c.processes.get(&pid).is_some() { if let Some(p) = c.processes.get_mut(&pid) {
return c.processes.get_mut(&pid); return Some(p);
} }
} }
@ -280,24 +270,19 @@ impl Sandbox {
let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?; let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?;
for (_, ctr) in self.containers.iter() { for (_, ctr) in self.containers.iter() {
let cpu = ctr let mut container_cpust = "";
.config if let Some(spec) = ctr.config.spec.as_ref() {
.spec if let Some(linux) = spec.linux.as_ref() {
.as_ref() if let Some(resources) = linux.resources.as_ref() {
.unwrap() if let Some(cpus) = resources.cpu.as_ref() {
.linux container_cpust = &cpus.cpus;
.as_ref() }
.unwrap() }
.resources }
.as_ref() }
.unwrap()
.cpu
.as_ref();
let container_cpust = if let Some(c) = cpu { &c.cpus } else { "" };
info!(self.logger, "updating {}", ctr.id.as_str()); info!(self.logger, "updating {}", ctr.id.as_str());
ctr.cgroup_manager ctr.cgroup_manager
.as_ref()
.update_cpuset_path(guest_cpuset.as_str(), container_cpust)?; .update_cpuset_path(guest_cpuset.as_str(), container_cpust)?;
} }
@ -360,31 +345,28 @@ impl Sandbox {
#[instrument] #[instrument]
pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) { pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) {
let logger = self.logger.clone(); let logger = self.logger.clone();
let tx = match self.event_tx.as_ref() {
if self.event_tx.is_none() { Some(v) => v.clone(),
error!( None => {
logger, error!(
"sandbox.event_tx not found in run_oom_event_monitor" logger,
); "sandbox.event_tx not found in run_oom_event_monitor"
return; );
} return;
}
let tx = self.event_tx.as_ref().unwrap().clone(); };
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let event = rx.recv().await; let event = rx.recv().await;
// None means the container has exited, // None means the container has exited, and sender in OOM notifier is dropped.
// and sender in OOM notifier is dropped.
if event.is_none() { if event.is_none() {
return; return;
} }
info!(logger, "got an OOM event {:?}", event); info!(logger, "got an OOM event {:?}", event);
if let Err(e) = tx.send(container_id.clone()).await {
let _ = tx error!(logger, "failed to send message: {:?}", e);
.send(container_id.clone()) }
.await
.map_err(|e| error!(logger, "failed to send message: {:?}", e));
} }
}); });
} }
@ -397,39 +379,36 @@ fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Res
for e in fs::read_dir(path)? { for e in fs::read_dir(path)? {
let entry = e?; let entry = e?;
let tmpname = entry.file_name(); // Skip direntry which doesn't match the pattern.
let name = tmpname.to_str().unwrap(); match entry.file_name().to_str() {
let p = entry.path(); None => continue,
Some(v) => {
if re.is_match(name) { if !re.is_match(v) {
let file = format!("{}/{}", p.to_str().unwrap(), SYSFS_ONLINE_FILE);
info!(logger, "{}", file.as_str());
let c = fs::read_to_string(file.as_str());
if c.is_err() {
continue;
}
let c = c.unwrap();
if c.trim().contains('0') {
let r = fs::write(file.as_str(), "1");
if r.is_err() {
continue; continue;
} }
count += 1; }
};
if num > 0 && count == num { let p = entry.path().join(SYSFS_ONLINE_FILE);
if let Ok(c) = fs::read_to_string(&p) {
// Try to online the object in offline state.
if c.trim().contains('0') && fs::write(&p, "1").is_ok() && num > 0 {
count += 1;
if count == num {
break; break;
} }
} }
} }
} }
if num > 0 { Ok(count)
return Ok(count); }
}
Ok(0) #[instrument]
fn online_memory(logger: &Logger) -> Result<()> {
online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)
.context("online memory resource")?;
Ok(())
} }
// max wait for all CPUs to online will use 50 * 100 = 5 seconds. // max wait for all CPUs to online will use 50 * 100 = 5 seconds.
@ -473,13 +452,6 @@ fn online_cpus(logger: &Logger, num: i32) -> Result<i32> {
)) ))
} }
#[instrument]
fn online_memory(logger: &Logger) -> Result<()> {
online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)
.context("online memory resource")?;
Ok(())
}
fn onlined_cpus() -> Result<i32> { fn onlined_cpus() -> Result<i32> {
let content = let content =
fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?; fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?;
@ -555,7 +527,7 @@ mod tests {
skip_if_not_root!(); skip_if_not_root!();
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
let s = Sandbox::new(&logger).unwrap(); let mut s = Sandbox::new(&logger).unwrap();
let tmpdir = Builder::new().tempdir().unwrap(); let tmpdir = Builder::new().tempdir().unwrap();
let tmpdir_path = tmpdir.path().to_str().unwrap(); let tmpdir_path = tmpdir.path().to_str().unwrap();