agent: refine style of code related to sandbox

Refine style of code related to sandbox by:
- remove unnecessary comments for caller to take lock, we have already taken
  `&mut self`.
- change "*count < 1 " to "*count == 0", `count` is type of u32.
- make remove_sandbox_storage() to take `&mut self` instead of `&self`.
- group related function to each others
- avoid search the map twice in function find_process()
- avoid unwrap() in function run_oom_event_monitor()
- avoid unwrap() in online_resources()

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-08-05 23:15:53 +08:00
parent 71a9f67781
commit d3c5422379

View File

@ -3,14 +3,14 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::linux_abi::*;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
use crate::namespace::Namespace;
use crate::netlink::Handle;
use crate::network::Network;
use crate::pci;
use crate::uevent::{Uevent, UeventMatcher};
use crate::watcher::BindWatcher;
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::{thread, time};
use anyhow::{anyhow, Context, Result};
use kata_types::cpu::CpuSet;
use libc::pid_t;
@ -22,18 +22,20 @@ use rustjail::container::BaseContainer;
use rustjail::container::LinuxContainer;
use rustjail::process::Process;
use slog::Logger;
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::{thread, time};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing::instrument;
use crate::linux_abi::*;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
use crate::namespace::Namespace;
use crate::netlink::Handle;
use crate::network::Network;
use crate::pci;
use crate::uevent::{Uevent, UeventMatcher};
use crate::watcher::BindWatcher;
pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
@ -103,9 +105,6 @@ impl Sandbox {
// This method also returns a boolean to let
// callers know if the storage already existed or not.
// It will return true if storage is new.
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument]
pub fn set_sandbox_storage(&mut self, path: &str) -> bool {
match self.storages.get_mut(path) {
@ -126,16 +125,13 @@ impl Sandbox {
// storage reference from the sandbox and return 'true' to
// let the caller know that they can clean up the storage
// related directories by calling remove_sandbox_storage
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument]
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> {
match self.storages.get_mut(path) {
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
Some(count) => {
*count -= 1;
if *count < 1 {
if *count == 0 {
self.storages.remove(path);
return Ok(true);
}
@ -146,11 +142,8 @@ impl Sandbox {
// remove_sandbox_storage removes the sandbox storage if no
// containers are using that storage.
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument]
pub fn remove_sandbox_storage(&self, path: &str) -> Result<()> {
pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
let mounts = vec![path.to_string()];
remove_mounts(&mounts)?;
// "remove_dir" will fail if the mount point is backed by a read-only filesystem.
@ -165,9 +158,6 @@ impl Sandbox {
// unset_and_remove_sandbox_storage unsets the storage from sandbox
// and if there are no containers using this storage it will
// remove it from the sandbox.
//
// It's assumed that caller is calling this method after
// acquiring a lock on sandbox.
#[instrument]
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
if self.unset_sandbox_storage(path)? {
@ -184,22 +174,18 @@ impl Sandbox {
.get_ipc()
.setup()
.await
.context("Failed to setup persistent IPC namespace")?;
.context("setup persistent IPC namespace")?;
// // Set up shared UTS namespace
self.shared_utsns = Namespace::new(&self.logger)
.get_uts(self.hostname.as_str())
.setup()
.await
.context("Failed to setup persistent UTS namespace")?;
.context("setup persistent UTS namespace")?;
Ok(true)
}
pub fn add_container(&mut self, c: LinuxContainer) {
self.containers.insert(c.id.clone(), c);
}
#[instrument]
pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> {
// Populate the shared pid path only if this is an infra container and
@ -224,14 +210,18 @@ impl Sandbox {
Ok(())
}
pub fn add_container(&mut self, c: LinuxContainer) {
self.containers.insert(c.id.clone(), c);
}
pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> {
self.containers.get_mut(id)
}
pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> {
for (_, c) in self.containers.iter_mut() {
if c.processes.get(&pid).is_some() {
return c.processes.get_mut(&pid);
if let Some(p) = c.processes.get_mut(&pid) {
return Some(p);
}
}
@ -280,24 +270,19 @@ impl Sandbox {
let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?;
for (_, ctr) in self.containers.iter() {
let cpu = ctr
.config
.spec
.as_ref()
.unwrap()
.linux
.as_ref()
.unwrap()
.resources
.as_ref()
.unwrap()
.cpu
.as_ref();
let container_cpust = if let Some(c) = cpu { &c.cpus } else { "" };
let mut container_cpust = "";
if let Some(spec) = ctr.config.spec.as_ref() {
if let Some(linux) = spec.linux.as_ref() {
if let Some(resources) = linux.resources.as_ref() {
if let Some(cpus) = resources.cpu.as_ref() {
container_cpust = &cpus.cpus;
}
}
}
}
info!(self.logger, "updating {}", ctr.id.as_str());
ctr.cgroup_manager
.as_ref()
.update_cpuset_path(guest_cpuset.as_str(), container_cpust)?;
}
@ -360,31 +345,28 @@ impl Sandbox {
#[instrument]
pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) {
let logger = self.logger.clone();
if self.event_tx.is_none() {
error!(
logger,
"sandbox.event_tx not found in run_oom_event_monitor"
);
return;
}
let tx = self.event_tx.as_ref().unwrap().clone();
let tx = match self.event_tx.as_ref() {
Some(v) => v.clone(),
None => {
error!(
logger,
"sandbox.event_tx not found in run_oom_event_monitor"
);
return;
}
};
tokio::spawn(async move {
loop {
let event = rx.recv().await;
// None means the container has exited,
// and sender in OOM notifier is dropped.
// None means the container has exited, and sender in OOM notifier is dropped.
if event.is_none() {
return;
}
info!(logger, "got an OOM event {:?}", event);
let _ = tx
.send(container_id.clone())
.await
.map_err(|e| error!(logger, "failed to send message: {:?}", e));
if let Err(e) = tx.send(container_id.clone()).await {
error!(logger, "failed to send message: {:?}", e);
}
}
});
}
@ -397,39 +379,36 @@ fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Res
for e in fs::read_dir(path)? {
let entry = e?;
let tmpname = entry.file_name();
let name = tmpname.to_str().unwrap();
let p = entry.path();
if re.is_match(name) {
let file = format!("{}/{}", p.to_str().unwrap(), SYSFS_ONLINE_FILE);
info!(logger, "{}", file.as_str());
let c = fs::read_to_string(file.as_str());
if c.is_err() {
continue;
}
let c = c.unwrap();
if c.trim().contains('0') {
let r = fs::write(file.as_str(), "1");
if r.is_err() {
// Skip direntry which doesn't match the pattern.
match entry.file_name().to_str() {
None => continue,
Some(v) => {
if !re.is_match(v) {
continue;
}
count += 1;
}
};
if num > 0 && count == num {
let p = entry.path().join(SYSFS_ONLINE_FILE);
if let Ok(c) = fs::read_to_string(&p) {
// Try to online the object in offline state.
if c.trim().contains('0') && fs::write(&p, "1").is_ok() && num > 0 {
count += 1;
if count == num {
break;
}
}
}
}
if num > 0 {
return Ok(count);
}
Ok(count)
}
Ok(0)
#[instrument]
fn online_memory(logger: &Logger) -> Result<()> {
online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)
.context("online memory resource")?;
Ok(())
}
// max wait for all CPUs to online will use 50 * 100 = 5 seconds.
@ -473,13 +452,6 @@ fn online_cpus(logger: &Logger, num: i32) -> Result<i32> {
))
}
#[instrument]
fn online_memory(logger: &Logger) -> Result<()> {
online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)
.context("online memory resource")?;
Ok(())
}
fn onlined_cpus() -> Result<i32> {
let content =
fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?;
@ -555,7 +527,7 @@ mod tests {
skip_if_not_root!();
let logger = slog::Logger::root(slog::Discard, o!());
let s = Sandbox::new(&logger).unwrap();
let mut s = Sandbox::new(&logger).unwrap();
let tmpdir = Builder::new().tempdir().unwrap();
let tmpdir_path = tmpdir.path().to_str().unwrap();