mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-08-17 15:38:00 +00:00
Merge pull request #7552 from jiangliu/agent-r1
Fix mimor bugs and improve coding stype of agent rpc/sandbox/mount
This commit is contained in:
commit
311671abb5
@ -4,8 +4,8 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fmt::Debug;
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{self, File, OpenOptions};
|
||||||
use std::io::{BufRead, BufReader, Write};
|
use std::io::{BufRead, BufReader, Write};
|
||||||
use std::iter;
|
use std::iter;
|
||||||
use std::os::unix::fs::{MetadataExt, PermissionsExt};
|
use std::os::unix::fs::{MetadataExt, PermissionsExt};
|
||||||
@ -13,12 +13,13 @@ use std::path::Path;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::sync::Mutex;
|
use anyhow::{anyhow, Context, Result};
|
||||||
|
|
||||||
use nix::mount::MsFlags;
|
use nix::mount::MsFlags;
|
||||||
use nix::unistd::{Gid, Uid};
|
use nix::unistd::{Gid, Uid};
|
||||||
|
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
use slog::Logger;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tracing::instrument;
|
||||||
|
|
||||||
use crate::device::{
|
use crate::device::{
|
||||||
get_scsi_device_name, get_virtio_blk_pci_device_name, get_virtio_mmio_device_name,
|
get_scsi_device_name, get_virtio_blk_pci_device_name, get_virtio_mmio_device_name,
|
||||||
@ -34,17 +35,13 @@ use crate::protocols::types::FSGroupChangePolicy;
|
|||||||
use crate::Sandbox;
|
use crate::Sandbox;
|
||||||
#[cfg(target_arch = "s390x")]
|
#[cfg(target_arch = "s390x")]
|
||||||
use crate::{ccw, device::get_virtio_blk_ccw_device_name};
|
use crate::{ccw, device::get_virtio_blk_ccw_device_name};
|
||||||
use anyhow::{anyhow, Context, Result};
|
|
||||||
use slog::Logger;
|
|
||||||
|
|
||||||
use tracing::instrument;
|
|
||||||
|
|
||||||
pub const TYPE_ROOTFS: &str = "rootfs";
|
pub const TYPE_ROOTFS: &str = "rootfs";
|
||||||
const SYS_FS_HUGEPAGES_PREFIX: &str = "/sys/kernel/mm/hugepages";
|
|
||||||
pub const MOUNT_GUEST_TAG: &str = "kataShared";
|
pub const MOUNT_GUEST_TAG: &str = "kataShared";
|
||||||
|
|
||||||
// Allocating an FSGroup that owns the pod's volumes
|
// Allocating an FSGroup that owns the pod's volumes
|
||||||
const FS_GID: &str = "fsgid";
|
const FS_GID: &str = "fsgid";
|
||||||
|
const SYS_FS_HUGEPAGES_PREFIX: &str = "/sys/kernel/mm/hugepages";
|
||||||
|
|
||||||
const RW_MASK: u32 = 0o660;
|
const RW_MASK: u32 = 0o660;
|
||||||
const RO_MASK: u32 = 0o440;
|
const RO_MASK: u32 = 0o440;
|
||||||
@ -218,9 +215,9 @@ pub fn baremount(
|
|||||||
)
|
)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
anyhow!(
|
anyhow!(
|
||||||
"failed to mount {:?} to {:?}, with error: {}",
|
"failed to mount {} to {}, with error: {}",
|
||||||
source,
|
source.display(),
|
||||||
destination,
|
destination.display(),
|
||||||
e
|
e
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@ -230,7 +227,7 @@ pub fn baremount(
|
|||||||
async fn ephemeral_storage_handler(
|
async fn ephemeral_storage_handler(
|
||||||
logger: &Logger,
|
logger: &Logger,
|
||||||
storage: &Storage,
|
storage: &Storage,
|
||||||
sandbox: &Arc<Mutex<Sandbox>>,
|
_sandbox: &Arc<Mutex<Sandbox>>,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
// hugetlbfs
|
// hugetlbfs
|
||||||
if storage.fstype == FS_TYPE_HUGETLB {
|
if storage.fstype == FS_TYPE_HUGETLB {
|
||||||
@ -238,21 +235,19 @@ async fn ephemeral_storage_handler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// normal ephemeral storage
|
// normal ephemeral storage
|
||||||
fs::create_dir_all(Path::new(&storage.mount_point))?;
|
fs::create_dir_all(&storage.mount_point)?;
|
||||||
|
|
||||||
// By now we only support one option field: "fsGroup" which
|
|
||||||
// isn't an valid mount option, thus we should remove it when
|
|
||||||
// do mount.
|
|
||||||
if !storage.options.is_empty() {
|
if !storage.options.is_empty() {
|
||||||
// ephemeral_storage didn't support mount options except fsGroup.
|
// By now we only support one option field: "fsGroup" which
|
||||||
|
// isn't an valid mount option, thus we should remove it when
|
||||||
|
// do mount.
|
||||||
let mut new_storage = storage.clone();
|
let mut new_storage = storage.clone();
|
||||||
new_storage.options = Default::default();
|
new_storage.options = Default::default();
|
||||||
common_storage_handler(logger, &new_storage)?;
|
common_storage_handler(logger, &new_storage)?;
|
||||||
|
|
||||||
let opts_vec: Vec<String> = storage.options.to_vec();
|
let opts = parse_options(&storage.options);
|
||||||
|
|
||||||
let opts = parse_options(opts_vec);
|
|
||||||
|
|
||||||
|
// ephemeral_storage didn't support mount options except fsGroup.
|
||||||
if let Some(fsgid) = opts.get(FS_GID) {
|
if let Some(fsgid) = opts.get(FS_GID) {
|
||||||
let gid = fsgid.parse::<u32>()?;
|
let gid = fsgid.parse::<u32>()?;
|
||||||
|
|
||||||
@ -278,56 +273,57 @@ async fn ephemeral_storage_handler(
|
|||||||
pub async fn update_ephemeral_mounts(
|
pub async fn update_ephemeral_mounts(
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
storages: Vec<Storage>,
|
storages: Vec<Storage>,
|
||||||
sandbox: &Arc<Mutex<Sandbox>>,
|
_sandbox: &Arc<Mutex<Sandbox>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for (_, storage) in storages.iter().enumerate() {
|
for (_, storage) in storages.iter().enumerate() {
|
||||||
let handler_name = storage.driver.clone();
|
let handler_name = &storage.driver;
|
||||||
let logger = logger.new(o!(
|
let logger = logger.new(o!(
|
||||||
"msg" => "updating tmpfs storage",
|
"msg" => "updating tmpfs storage",
|
||||||
"subsystem" => "storage",
|
"subsystem" => "storage",
|
||||||
"storage-type" => handler_name.to_owned()));
|
"storage-type" => handler_name.to_owned()));
|
||||||
|
|
||||||
match handler_name.as_str() {
|
match handler_name.as_str() {
|
||||||
DRIVER_EPHEMERAL_TYPE => {
|
DRIVER_EPHEMERAL_TYPE => {
|
||||||
fs::create_dir_all(Path::new(&storage.mount_point))?;
|
fs::create_dir_all(&storage.mount_point)?;
|
||||||
|
|
||||||
if storage.options.is_empty() {
|
if storage.options.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
// assume that fsGid has already been set
|
// assume that fsGid has already been set
|
||||||
let mut opts = Vec::<&str>::new();
|
let mut opts = Vec::new();
|
||||||
for (_, opt) in storage.options.iter().enumerate() {
|
for (_, opt) in storage.options.iter().enumerate() {
|
||||||
if opt.starts_with(FS_GID) {
|
let fields: Vec<&str> = opt.split('=').collect();
|
||||||
|
if fields.len() == 2 && fields[0] == FS_GID {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
opts.push(opt)
|
opts.push(opt.as_str())
|
||||||
}
|
}
|
||||||
|
let (flags, options) = parse_mount_flags_and_options(&opts);
|
||||||
|
|
||||||
let mount_path = Path::new(&storage.mount_point);
|
let mount_path = Path::new(&storage.mount_point);
|
||||||
let src_path = Path::new(&storage.source);
|
let src_path = Path::new(&storage.source);
|
||||||
|
|
||||||
let (flags, options) = parse_mount_flags_and_options(opts);
|
|
||||||
|
|
||||||
info!(logger, "mounting storage";
|
info!(logger, "mounting storage";
|
||||||
"mount-source" => src_path.display(),
|
"mount-source" => src_path.display(),
|
||||||
"mount-destination" => mount_path.display(),
|
"mount-destination" => mount_path.display(),
|
||||||
"mount-fstype" => storage.fstype.as_str(),
|
"mount-fstype" => storage.fstype.as_str(),
|
||||||
"mount-options" => options.as_str(),
|
"mount-options" => options.as_str(),
|
||||||
);
|
);
|
||||||
|
|
||||||
return baremount(
|
baremount(
|
||||||
src_path,
|
src_path,
|
||||||
mount_path,
|
mount_path,
|
||||||
storage.fstype.as_str(),
|
storage.fstype.as_str(),
|
||||||
flags,
|
flags,
|
||||||
options.as_str(),
|
options.as_str(),
|
||||||
&logger,
|
&logger,
|
||||||
);
|
)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
return Err(anyhow!(
|
return Err(anyhow!(
|
||||||
"Unsupported storage type for syncing mounts {}. Only ephemeral storage update is supported",
|
"Unsupported storage type for syncing mounts {}. Only ephemeral storage update is supported",
|
||||||
storage.driver.to_owned()
|
storage.driver
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -374,16 +370,14 @@ async fn overlayfs_storage_handler(
|
|||||||
async fn local_storage_handler(
|
async fn local_storage_handler(
|
||||||
_logger: &Logger,
|
_logger: &Logger,
|
||||||
storage: &Storage,
|
storage: &Storage,
|
||||||
sandbox: &Arc<Mutex<Sandbox>>,
|
_sandbox: &Arc<Mutex<Sandbox>>,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
fs::create_dir_all(&storage.mount_point).context(format!(
|
fs::create_dir_all(&storage.mount_point).context(format!(
|
||||||
"failed to create dir all {:?}",
|
"failed to create dir all {:?}",
|
||||||
&storage.mount_point
|
&storage.mount_point
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
let opts_vec: Vec<String> = storage.options.to_vec();
|
let opts = parse_options(&storage.options);
|
||||||
|
|
||||||
let opts = parse_options(opts_vec);
|
|
||||||
|
|
||||||
let mut need_set_fsgid = false;
|
let mut need_set_fsgid = false;
|
||||||
if let Some(fsgid) = opts.get(FS_GID) {
|
if let Some(fsgid) = opts.get(FS_GID) {
|
||||||
@ -563,7 +557,6 @@ async fn virtio_blk_storage_handler(
|
|||||||
if storage.source.starts_with("/dev") {
|
if storage.source.starts_with("/dev") {
|
||||||
let metadata = fs::metadata(&storage.source)
|
let metadata = fs::metadata(&storage.source)
|
||||||
.context(format!("get metadata on file {:?}", &storage.source))?;
|
.context(format!("get metadata on file {:?}", &storage.source))?;
|
||||||
|
|
||||||
let mode = metadata.permissions().mode();
|
let mode = metadata.permissions().mode();
|
||||||
if mode & libc::S_IFBLK == 0 {
|
if mode & libc::S_IFBLK == 0 {
|
||||||
return Err(anyhow!("Invalid device {}", &storage.source));
|
return Err(anyhow!("Invalid device {}", &storage.source));
|
||||||
@ -620,12 +613,9 @@ async fn virtio_scsi_storage_handler(
|
|||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result<String> {
|
fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result<String> {
|
||||||
// Mount the storage device.
|
|
||||||
let mount_point = storage.mount_point.to_string();
|
|
||||||
|
|
||||||
mount_storage(logger, storage)?;
|
mount_storage(logger, storage)?;
|
||||||
set_ownership(logger, storage)?;
|
set_ownership(logger, storage)?;
|
||||||
Ok(mount_point)
|
Ok(storage.mount_point.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
// nvdimm_storage_handler handles the storage for NVDIMM driver.
|
// nvdimm_storage_handler handles the storage for NVDIMM driver.
|
||||||
@ -666,9 +656,8 @@ async fn bind_watcher_storage_handler(
|
|||||||
fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
|
fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
|
||||||
let logger = logger.new(o!("subsystem" => "mount"));
|
let logger = logger.new(o!("subsystem" => "mount"));
|
||||||
|
|
||||||
// Check share before attempting to mount to see if the destination is already a mount point.
|
// There's a special mechanism to create mountpoint from a `sharedfs` instance before
|
||||||
// If so, skip doing the mount. This facilitates mounting the sharedfs automatically
|
// starting the kata-agent. Check for such cases.
|
||||||
// in the guest before the agent service starts.
|
|
||||||
if storage.source == MOUNT_GUEST_TAG && is_mounted(&storage.mount_point)? {
|
if storage.source == MOUNT_GUEST_TAG && is_mounted(&storage.mount_point)? {
|
||||||
warn!(
|
warn!(
|
||||||
logger,
|
logger,
|
||||||
@ -680,27 +669,23 @@ fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> {
|
|||||||
let mount_path = Path::new(&storage.mount_point);
|
let mount_path = Path::new(&storage.mount_point);
|
||||||
let src_path = Path::new(&storage.source);
|
let src_path = Path::new(&storage.source);
|
||||||
if storage.fstype == "bind" && !src_path.is_dir() {
|
if storage.fstype == "bind" && !src_path.is_dir() {
|
||||||
ensure_destination_file_exists(mount_path)
|
ensure_destination_file_exists(mount_path).context("Could not create mountpoint file")?;
|
||||||
} else {
|
} else {
|
||||||
fs::create_dir_all(mount_path).map_err(anyhow::Error::from)
|
fs::create_dir_all(mount_path)
|
||||||
|
.map_err(anyhow::Error::from)
|
||||||
|
.context("Could not create mountpoint")?;
|
||||||
}
|
}
|
||||||
.context("Could not create mountpoint")?;
|
let (flags, options) = parse_mount_flags_and_options(&storage.options);
|
||||||
|
|
||||||
let options_vec = storage.options.to_vec();
|
|
||||||
let options_vec = options_vec.iter().map(String::as_str).collect();
|
|
||||||
let (flags, options) = parse_mount_flags_and_options(options_vec);
|
|
||||||
|
|
||||||
let source = Path::new(&storage.source);
|
|
||||||
|
|
||||||
info!(logger, "mounting storage";
|
info!(logger, "mounting storage";
|
||||||
"mount-source" => source.display(),
|
"mount-source" => src_path.display(),
|
||||||
"mount-destination" => mount_path.display(),
|
"mount-destination" => mount_path.display(),
|
||||||
"mount-fstype" => storage.fstype.as_str(),
|
"mount-fstype" => storage.fstype.as_str(),
|
||||||
"mount-options" => options.as_str(),
|
"mount-options" => options.as_str(),
|
||||||
);
|
);
|
||||||
|
|
||||||
baremount(
|
baremount(
|
||||||
source,
|
src_path,
|
||||||
mount_path,
|
mount_path,
|
||||||
storage.fstype.as_str(),
|
storage.fstype.as_str(),
|
||||||
flags,
|
flags,
|
||||||
@ -717,14 +702,9 @@ pub fn set_ownership(logger: &Logger, storage: &Storage) -> Result<()> {
|
|||||||
if storage.fs_group.is_none() {
|
if storage.fs_group.is_none() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let fs_group = storage.fs_group();
|
let fs_group = storage.fs_group();
|
||||||
|
let read_only = storage.options.contains(&String::from("ro"));
|
||||||
let mut read_only = false;
|
|
||||||
let opts_vec: Vec<String> = storage.options.to_vec();
|
|
||||||
if opts_vec.contains(&String::from("ro")) {
|
|
||||||
read_only = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mount_path = Path::new(&storage.mount_point);
|
let mount_path = Path::new(&storage.mount_point);
|
||||||
let metadata = mount_path.metadata().map_err(|err| {
|
let metadata = mount_path.metadata().map_err(|err| {
|
||||||
error!(logger, "failed to obtain metadata for mount path";
|
error!(logger, "failed to obtain metadata for mount path";
|
||||||
@ -809,24 +789,25 @@ pub fn is_mounted(mount_point: &str) -> Result<bool> {
|
|||||||
let found = fs::metadata(mount_point).is_ok()
|
let found = fs::metadata(mount_point).is_ok()
|
||||||
// Looks through /proc/mounts and check if the mount exists
|
// Looks through /proc/mounts and check if the mount exists
|
||||||
&& fs::read_to_string("/proc/mounts")?
|
&& fs::read_to_string("/proc/mounts")?
|
||||||
.lines()
|
.lines()
|
||||||
.any(|line| {
|
.any(|line| {
|
||||||
// The 2nd column reveals the mount point.
|
// The 2nd column reveals the mount point.
|
||||||
line.split_whitespace()
|
line.split_whitespace()
|
||||||
.nth(1)
|
.nth(1)
|
||||||
.map(|target| mount_point.eq(target))
|
.map(|target| mount_point.eq(target))
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(found)
|
Ok(found)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) {
|
fn parse_mount_flags_and_options<S: AsRef<str> + Debug>(options_vec: &[S]) -> (MsFlags, String) {
|
||||||
let mut flags = MsFlags::empty();
|
let mut flags = MsFlags::empty();
|
||||||
let mut options: String = "".to_string();
|
let mut options: String = "".to_string();
|
||||||
|
|
||||||
for opt in options_vec {
|
for opt in options_vec {
|
||||||
|
let opt = opt.as_ref();
|
||||||
if !opt.is_empty() {
|
if !opt.is_empty() {
|
||||||
match FLAGS.get(opt) {
|
match FLAGS.get(opt) {
|
||||||
Some(x) => {
|
Some(x) => {
|
||||||
@ -861,17 +842,16 @@ fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) {
|
|||||||
#[instrument]
|
#[instrument]
|
||||||
pub async fn add_storages(
|
pub async fn add_storages(
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
storages: Vec<Storage>,
|
storages: &[Storage],
|
||||||
sandbox: &Arc<Mutex<Sandbox>>,
|
sandbox: &Arc<Mutex<Sandbox>>,
|
||||||
cid: Option<String>,
|
cid: Option<String>,
|
||||||
) -> Result<Vec<String>> {
|
) -> Result<Vec<String>> {
|
||||||
let mut mount_list = Vec::new();
|
let mut mount_list = Vec::new();
|
||||||
|
|
||||||
for storage in storages {
|
for storage in storages {
|
||||||
let handler_name = storage.driver.clone();
|
let handler_name = &storage.driver;
|
||||||
let logger = logger.new(o!(
|
let logger =
|
||||||
"subsystem" => "storage",
|
logger.new(o!( "subsystem" => "storage", "storage-type" => handler_name.to_string()));
|
||||||
"storage-type" => handler_name.to_owned()));
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut sb = sandbox.lock().await;
|
let mut sb = sandbox.lock().await;
|
||||||
@ -882,22 +862,20 @@ pub async fn add_storages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let res = match handler_name.as_str() {
|
let res = match handler_name.as_str() {
|
||||||
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox).await,
|
DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, &storage, sandbox).await,
|
DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox).await,
|
DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, &storage, sandbox).await,
|
DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, &storage, sandbox).await,
|
DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_OVERLAYFS_TYPE => {
|
DRIVER_OVERLAYFS_TYPE => {
|
||||||
overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox).await
|
overlayfs_storage_handler(&logger, storage, cid.as_deref(), sandbox).await
|
||||||
}
|
}
|
||||||
DRIVER_MMIO_BLK_TYPE => {
|
DRIVER_MMIO_BLK_TYPE => virtiommio_blk_storage_handler(&logger, storage, sandbox).await,
|
||||||
virtiommio_blk_storage_handler(&logger, &storage, sandbox).await
|
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, storage, sandbox).await,
|
||||||
}
|
DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox).await,
|
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, storage, sandbox).await,
|
||||||
DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, &storage, sandbox).await,
|
|
||||||
DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox).await,
|
|
||||||
DRIVER_WATCHABLE_BIND_TYPE => {
|
DRIVER_WATCHABLE_BIND_TYPE => {
|
||||||
bind_watcher_storage_handler(&logger, &storage, sandbox, cid.clone()).await?;
|
bind_watcher_storage_handler(&logger, storage, sandbox, cid.clone()).await?;
|
||||||
// Don't register watch mounts, they're handled separately by the watcher.
|
// Don't register watch mounts, they're handled separately by the watcher.
|
||||||
Ok(String::new())
|
Ok(String::new())
|
||||||
}
|
}
|
||||||
@ -916,9 +894,9 @@ pub async fn add_storages(
|
|||||||
"add_storages failed, storage: {:?}, error: {:?} ", storage, e
|
"add_storages failed, storage: {:?}, error: {:?} ", storage, e
|
||||||
);
|
);
|
||||||
let mut sb = sandbox.lock().await;
|
let mut sb = sandbox.lock().await;
|
||||||
sb.unset_sandbox_storage(&storage.mount_point)
|
if let Err(e) = sb.unset_sandbox_storage(&storage.mount_point) {
|
||||||
.map_err(|e| warn!(logger, "fail to unset sandbox storage {:?}", e))
|
warn!(logger, "fail to unset sandbox storage {:?}", e);
|
||||||
.ok();
|
}
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
Ok(m) => m,
|
Ok(m) => m,
|
||||||
@ -934,11 +912,9 @@ pub async fn add_storages(
|
|||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
fn mount_to_rootfs(logger: &Logger, m: &InitMount) -> Result<()> {
|
fn mount_to_rootfs(logger: &Logger, m: &InitMount) -> Result<()> {
|
||||||
let options_vec: Vec<&str> = m.options.clone();
|
let (flags, options) = parse_mount_flags_and_options(&m.options);
|
||||||
|
|
||||||
let (flags, options) = parse_mount_flags_and_options(options_vec);
|
fs::create_dir_all(m.dest).context("could not create directory")?;
|
||||||
|
|
||||||
fs::create_dir_all(Path::new(m.dest)).context("could not create directory")?;
|
|
||||||
|
|
||||||
let source = Path::new(m.src);
|
let source = Path::new(m.src);
|
||||||
let dest = Path::new(m.dest);
|
let dest = Path::new(m.dest);
|
||||||
@ -1143,17 +1119,14 @@ fn ensure_destination_file_exists(path: &Path) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
fn parse_options(option_list: Vec<String>) -> HashMap<String, String> {
|
fn parse_options(option_list: &[String]) -> HashMap<String, String> {
|
||||||
let mut options = HashMap::new();
|
let mut options = HashMap::new();
|
||||||
for opt in option_list.iter() {
|
for opt in option_list.iter() {
|
||||||
let fields: Vec<&str> = opt.split('=').collect();
|
let fields: Vec<&str> = opt.split('=').collect();
|
||||||
if fields.len() != 2 {
|
if fields.len() == 2 {
|
||||||
continue;
|
options.insert(fields[0].to_string(), fields[1].to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
options.insert(fields[0].to_string(), fields[1].to_string());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
options
|
options
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2070,7 +2043,7 @@ mod tests {
|
|||||||
for (i, d) in tests.iter().enumerate() {
|
for (i, d) in tests.iter().enumerate() {
|
||||||
let msg = format!("test[{}]: {:?}", i, d);
|
let msg = format!("test[{}]: {:?}", i, d);
|
||||||
|
|
||||||
let result = parse_mount_flags_and_options(d.options_vec.clone());
|
let result = parse_mount_flags_and_options(&d.options_vec);
|
||||||
|
|
||||||
let msg = format!("{}: result: {:?}", msg, result);
|
let msg = format!("{}: result: {:?}", msg, result);
|
||||||
|
|
||||||
|
@ -169,7 +169,7 @@ impl AgentService {
|
|||||||
// updates the devices listed in the OCI spec, so that they actually
|
// updates the devices listed in the OCI spec, so that they actually
|
||||||
// match real devices inside the VM. This step is necessary since we
|
// match real devices inside the VM. This step is necessary since we
|
||||||
// cannot predict everything from the caller.
|
// cannot predict everything from the caller.
|
||||||
add_devices(&req.devices.to_vec(), &mut oci, &self.sandbox).await?;
|
add_devices(&req.devices, &mut oci, &self.sandbox).await?;
|
||||||
|
|
||||||
// Both rootfs and volumes (invoked with --volume for instance) will
|
// Both rootfs and volumes (invoked with --volume for instance) will
|
||||||
// be processed the same way. The idea is to always mount any provided
|
// be processed the same way. The idea is to always mount any provided
|
||||||
@ -180,7 +180,7 @@ impl AgentService {
|
|||||||
// list) to bind mount all of them inside the container.
|
// list) to bind mount all of them inside the container.
|
||||||
let m = add_storages(
|
let m = add_storages(
|
||||||
sl(),
|
sl(),
|
||||||
req.storages.to_vec(),
|
&req.storages,
|
||||||
&self.sandbox,
|
&self.sandbox,
|
||||||
Some(req.container_id.clone()),
|
Some(req.container_id.clone()),
|
||||||
)
|
)
|
||||||
@ -258,15 +258,13 @@ impl AgentService {
|
|||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
|
async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> {
|
||||||
let cid = req.container_id;
|
|
||||||
|
|
||||||
let mut s = self.sandbox.lock().await;
|
let mut s = self.sandbox.lock().await;
|
||||||
let sid = s.id.clone();
|
let sid = s.id.clone();
|
||||||
|
let cid = req.container_id;
|
||||||
|
|
||||||
let ctr = s
|
let ctr = s
|
||||||
.get_container(&cid)
|
.get_container(&cid)
|
||||||
.ok_or_else(|| anyhow!("Invalid container id"))?;
|
.ok_or_else(|| anyhow!("Invalid container id"))?;
|
||||||
|
|
||||||
ctr.exec().await?;
|
ctr.exec().await?;
|
||||||
|
|
||||||
if sid == cid {
|
if sid == cid {
|
||||||
@ -274,13 +272,9 @@ impl AgentService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// start oom event loop
|
// start oom event loop
|
||||||
|
if let Ok(cg_path) = ctr.cgroup_manager.as_ref().get_cgroup_path("memory") {
|
||||||
let cg_path = ctr.cgroup_manager.as_ref().get_cgroup_path("memory");
|
|
||||||
|
|
||||||
if let Ok(cg_path) = cg_path {
|
|
||||||
let rx = notifier::notify_oom(cid.as_str(), cg_path.to_string()).await?;
|
let rx = notifier::notify_oom(cid.as_str(), cg_path.to_string()).await?;
|
||||||
|
s.run_oom_event_monitor(rx, cid).await;
|
||||||
s.run_oom_event_monitor(rx, cid.clone()).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -291,64 +285,51 @@ impl AgentService {
|
|||||||
&self,
|
&self,
|
||||||
req: protocols::agent::RemoveContainerRequest,
|
req: protocols::agent::RemoveContainerRequest,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id;
|
||||||
|
|
||||||
if req.timeout == 0 {
|
if req.timeout == 0 {
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
|
||||||
sandbox.bind_watcher.remove_container(&cid).await;
|
sandbox.bind_watcher.remove_container(&cid).await;
|
||||||
|
|
||||||
sandbox
|
sandbox
|
||||||
.get_container(&cid)
|
.get_container(&cid)
|
||||||
.ok_or_else(|| anyhow!("Invalid container id"))?
|
.ok_or_else(|| anyhow!("Invalid container id"))?
|
||||||
.destroy()
|
.destroy()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
remove_container_resources(&mut sandbox, &cid)?;
|
remove_container_resources(&mut sandbox, &cid)?;
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// timeout != 0
|
// timeout != 0
|
||||||
let s = self.sandbox.clone();
|
let s = self.sandbox.clone();
|
||||||
let cid2 = cid.clone();
|
let cid2 = cid.clone();
|
||||||
let (tx, rx) = tokio::sync::oneshot::channel::<i32>();
|
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
let mut sandbox = s.lock().await;
|
let mut sandbox = s.lock().await;
|
||||||
if let Some(ctr) = sandbox.get_container(&cid2) {
|
sandbox.bind_watcher.remove_container(&cid2).await;
|
||||||
ctr.destroy().await.unwrap();
|
match sandbox.get_container(&cid2) {
|
||||||
sandbox.bind_watcher.remove_container(&cid2).await;
|
Some(ctr) => ctr.destroy().await,
|
||||||
tx.send(1).unwrap();
|
None => Err(anyhow!("Invalid container id")),
|
||||||
};
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if tokio::time::timeout(Duration::from_secs(req.timeout.into()), rx)
|
let to = Duration::from_secs(req.timeout.into());
|
||||||
.await
|
match tokio::time::timeout(to, handle).await {
|
||||||
.is_err()
|
Ok(res) => {
|
||||||
{
|
res??;
|
||||||
return Err(anyhow!(nix::Error::ETIME));
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
remove_container_resources(&mut sandbox, &cid)
|
||||||
|
}
|
||||||
|
Err(_e) => Err(anyhow!(nix::Error::ETIME)),
|
||||||
}
|
}
|
||||||
|
|
||||||
if handle.await.is_err() {
|
|
||||||
return Err(anyhow!(nix::Error::UnknownErrno));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
|
||||||
remove_container_resources(&mut sandbox, &cid)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
|
async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id;
|
||||||
let exec_id = req.exec_id.clone();
|
let exec_id = req.exec_id;
|
||||||
|
|
||||||
info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id);
|
info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id);
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
|
||||||
let mut process = req
|
let mut process = req
|
||||||
.process
|
.process
|
||||||
.into_option()
|
.into_option()
|
||||||
@ -365,21 +346,19 @@ impl AgentService {
|
|||||||
.get_container(&cid)
|
.get_container(&cid)
|
||||||
.ok_or_else(|| anyhow!("Invalid container id"))?;
|
.ok_or_else(|| anyhow!("Invalid container id"))?;
|
||||||
|
|
||||||
ctr.run(p).await?;
|
ctr.run(p).await
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
|
async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id;
|
||||||
let eid = req.exec_id.clone();
|
let eid = req.exec_id;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
sl(),
|
sl(),
|
||||||
"signal process";
|
"signal process";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone(),
|
"exec-id" => &eid,
|
||||||
"signal" => req.signal,
|
"signal" => req.signal,
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -400,8 +379,8 @@ impl AgentService {
|
|||||||
info!(
|
info!(
|
||||||
sl(),
|
sl(),
|
||||||
"signal encounter ESRCH, continue";
|
"signal encounter ESRCH, continue";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone(),
|
"exec-id" => &eid,
|
||||||
"pid" => p.pid,
|
"pid" => p.pid,
|
||||||
"signal" => sig,
|
"signal" => sig,
|
||||||
);
|
);
|
||||||
@ -416,16 +395,16 @@ impl AgentService {
|
|||||||
info!(
|
info!(
|
||||||
sl(),
|
sl(),
|
||||||
"signal all the remaining processes";
|
"signal all the remaining processes";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone(),
|
"exec-id" => &eid,
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await {
|
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await {
|
||||||
warn!(
|
warn!(
|
||||||
sl(),
|
sl(),
|
||||||
"freeze cgroup failed";
|
"freeze cgroup failed";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone(),
|
"exec-id" => &eid,
|
||||||
"error" => format!("{:?}", err),
|
"error" => format!("{:?}", err),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -437,8 +416,8 @@ impl AgentService {
|
|||||||
warn!(
|
warn!(
|
||||||
sl(),
|
sl(),
|
||||||
"signal failed";
|
"signal failed";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone(),
|
"exec-id" => &eid,
|
||||||
"pid" => pid,
|
"pid" => pid,
|
||||||
"error" => format!("{:?}", err),
|
"error" => format!("{:?}", err),
|
||||||
);
|
);
|
||||||
@ -448,12 +427,13 @@ impl AgentService {
|
|||||||
warn!(
|
warn!(
|
||||||
sl(),
|
sl(),
|
||||||
"unfreeze cgroup failed";
|
"unfreeze cgroup failed";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone(),
|
"exec-id" => &eid,
|
||||||
"error" => format!("{:?}", err),
|
"error" => format!("{:?}", err),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -462,8 +442,7 @@ impl AgentService {
|
|||||||
let ctr = sandbox
|
let ctr = sandbox
|
||||||
.get_container(cid)
|
.get_container(cid)
|
||||||
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
|
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
|
||||||
ctr.cgroup_manager.as_ref().freeze(state)?;
|
ctr.cgroup_manager.as_ref().freeze(state)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> {
|
async fn get_pids(&self, cid: &str) -> Result<Vec<i32>> {
|
||||||
@ -471,8 +450,7 @@ impl AgentService {
|
|||||||
let ctr = sandbox
|
let ctr = sandbox
|
||||||
.get_container(cid)
|
.get_container(cid)
|
||||||
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
|
.ok_or_else(|| anyhow!("Invalid container id {}", cid))?;
|
||||||
let pids = ctr.cgroup_manager.as_ref().get_pids()?;
|
ctr.cgroup_manager.as_ref().get_pids()
|
||||||
Ok(pids)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
@ -480,20 +458,19 @@ impl AgentService {
|
|||||||
&self,
|
&self,
|
||||||
req: protocols::agent::WaitProcessRequest,
|
req: protocols::agent::WaitProcessRequest,
|
||||||
) -> Result<protocols::agent::WaitProcessResponse> {
|
) -> Result<protocols::agent::WaitProcessResponse> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id;
|
||||||
let eid = req.exec_id;
|
let eid = req.exec_id;
|
||||||
let mut resp = WaitProcessResponse::new();
|
let mut resp = WaitProcessResponse::new();
|
||||||
let pid: pid_t;
|
|
||||||
|
|
||||||
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
|
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
sl(),
|
sl(),
|
||||||
"wait process";
|
"wait process";
|
||||||
"container-id" => cid.clone(),
|
"container-id" => &cid,
|
||||||
"exec-id" => eid.clone()
|
"exec-id" => &eid
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let pid: pid_t;
|
||||||
|
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
|
||||||
let exit_rx = {
|
let exit_rx = {
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
|
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
|
||||||
@ -548,8 +525,8 @@ impl AgentService {
|
|||||||
&self,
|
&self,
|
||||||
req: protocols::agent::WriteStreamRequest,
|
req: protocols::agent::WriteStreamRequest,
|
||||||
) -> Result<protocols::agent::WriteStreamResponse> {
|
) -> Result<protocols::agent::WriteStreamResponse> {
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id;
|
||||||
let eid = req.exec_id.clone();
|
let eid = req.exec_id;
|
||||||
|
|
||||||
let writer = {
|
let writer = {
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
@ -601,10 +578,6 @@ impl AgentService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if reader.is_none() {
|
|
||||||
return Err(anyhow!("Unable to determine stream reader, is None"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?;
|
let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?;
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@ -663,7 +636,6 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "remove_container", req);
|
trace_rpc_call!(ctx, "remove_container", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
|
|
||||||
match self.do_remove_container(req).await {
|
match self.do_remove_container(req).await {
|
||||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||||
Ok(_) => Ok(Empty::new()),
|
Ok(_) => Ok(Empty::new()),
|
||||||
@ -715,32 +687,23 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
trace_rpc_call!(ctx, "update_container", req);
|
trace_rpc_call!(ctx, "update_container", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
let cid = req.container_id.clone();
|
|
||||||
let res = req.resources;
|
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
|
||||||
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
|
|
||||||
ttrpc_error(
|
ttrpc_error(
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
"invalid container id".to_string(),
|
"invalid container id".to_string(),
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let resp = Empty::new();
|
if let Some(res) = req.resources.as_ref() {
|
||||||
|
|
||||||
if let Some(res) = res.as_ref() {
|
|
||||||
let oci_res = rustjail::resources_grpc_to_oci(res);
|
let oci_res = rustjail::resources_grpc_to_oci(res);
|
||||||
match ctr.set(oci_res) {
|
if let Err(e) = ctr.set(oci_res) {
|
||||||
Err(e) => {
|
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(_) => return Ok(resp),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(resp)
|
Ok(Empty::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stats_container(
|
async fn stats_container(
|
||||||
@ -750,10 +713,9 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<StatsContainerResponse> {
|
) -> ttrpc::Result<StatsContainerResponse> {
|
||||||
trace_rpc_call!(ctx, "stats_container", req);
|
trace_rpc_call!(ctx, "stats_container", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
let cid = req.container_id;
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
|
||||||
|
|
||||||
let ctr = sandbox.get_container(&cid).ok_or_else(|| {
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
|
||||||
ttrpc_error(
|
ttrpc_error(
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
"invalid container id".to_string(),
|
"invalid container id".to_string(),
|
||||||
@ -771,10 +733,9 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||||
trace_rpc_call!(ctx, "pause_container", req);
|
trace_rpc_call!(ctx, "pause_container", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
let cid = req.container_id();
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
|
||||||
|
|
||||||
let ctr = sandbox.get_container(cid).ok_or_else(|| {
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
|
||||||
ttrpc_error(
|
ttrpc_error(
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
"invalid container id".to_string(),
|
"invalid container id".to_string(),
|
||||||
@ -794,10 +755,9 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||||
trace_rpc_call!(ctx, "resume_container", req);
|
trace_rpc_call!(ctx, "resume_container", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
let cid = req.container_id();
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
|
||||||
|
|
||||||
let ctr = sandbox.get_container(cid).ok_or_else(|| {
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
|
||||||
ttrpc_error(
|
ttrpc_error(
|
||||||
ttrpc::Code::INVALID_ARGUMENT,
|
ttrpc::Code::INVALID_ARGUMENT,
|
||||||
"invalid container id".to_string(),
|
"invalid container id".to_string(),
|
||||||
@ -874,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "close_stdin", req);
|
trace_rpc_call!(ctx, "close_stdin", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
|
|
||||||
let cid = req.container_id.clone();
|
let cid = req.container_id;
|
||||||
let eid = req.exec_id;
|
let eid = req.exec_id;
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
|
|
||||||
@ -900,11 +860,9 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
trace_rpc_call!(ctx, "tty_win_resize", req);
|
trace_rpc_call!(ctx, "tty_win_resize", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
|
|
||||||
let cid = req.container_id.clone();
|
|
||||||
let eid = req.exec_id.clone();
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
let p = sandbox
|
let p = sandbox
|
||||||
.find_container_process(cid.as_str(), eid.as_str())
|
.find_container_process(req.container_id(), req.exec_id())
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
ttrpc_error(
|
ttrpc_error(
|
||||||
ttrpc::Code::UNAVAILABLE,
|
ttrpc::Code::UNAVAILABLE,
|
||||||
@ -1259,7 +1217,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
match add_storages(sl(), req.storages.to_vec(), &self.sandbox, None).await {
|
match add_storages(sl(), &req.storages, &self.sandbox, None).await {
|
||||||
Ok(m) => {
|
Ok(m) => {
|
||||||
self.sandbox.lock().await.mounts = m;
|
self.sandbox.lock().await.mounts = m;
|
||||||
}
|
}
|
||||||
@ -1290,15 +1248,14 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
|
|
||||||
let mut sandbox = self.sandbox.lock().await;
|
let mut sandbox = self.sandbox.lock().await;
|
||||||
// destroy all containers, clean up, notify agent to exit
|
// destroy all containers, clean up, notify agent to exit etc.
|
||||||
// etc.
|
|
||||||
sandbox
|
sandbox
|
||||||
.destroy()
|
.destroy()
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||||
// Close get_oom_event connection,
|
// Close get_oom_event connection,
|
||||||
// otherwise it will block the shutdown of ttrpc.
|
// otherwise it will block the shutdown of ttrpc.
|
||||||
sandbox.event_tx.take();
|
drop(sandbox.event_tx.take());
|
||||||
|
|
||||||
sandbox
|
sandbox
|
||||||
.sender
|
.sender
|
||||||
@ -1355,9 +1312,9 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
ctx: &TtrpcContext,
|
ctx: &TtrpcContext,
|
||||||
req: protocols::agent::OnlineCPUMemRequest,
|
req: protocols::agent::OnlineCPUMemRequest,
|
||||||
) -> ttrpc::Result<Empty> {
|
) -> ttrpc::Result<Empty> {
|
||||||
|
trace_rpc_call!(ctx, "online_cpu_mem", req);
|
||||||
is_allowed(&req)?;
|
is_allowed(&req)?;
|
||||||
let sandbox = self.sandbox.lock().await;
|
let sandbox = self.sandbox.lock().await;
|
||||||
trace_rpc_call!(ctx, "online_cpu_mem", req);
|
|
||||||
|
|
||||||
sandbox
|
sandbox
|
||||||
.online_cpu_memory(&req)
|
.online_cpu_memory(&req)
|
||||||
@ -1506,7 +1463,6 @@ impl agent_ttrpc::AgentService for AgentService {
|
|||||||
|
|
||||||
info!(sl(), "get volume stats!");
|
info!(sl(), "get volume stats!");
|
||||||
let mut resp = VolumeStatsResponse::new();
|
let mut resp = VolumeStatsResponse::new();
|
||||||
|
|
||||||
let mut condition = VolumeCondition::new();
|
let mut condition = VolumeCondition::new();
|
||||||
|
|
||||||
match File::open(&req.volume_guest_path) {
|
match File::open(&req.volume_guest_path) {
|
||||||
@ -1698,15 +1654,10 @@ pub fn start(s: Arc<Mutex<Sandbox>>, server_address: &str, init_mode: bool) -> R
|
|||||||
sandbox: s,
|
sandbox: s,
|
||||||
init_mode,
|
init_mode,
|
||||||
}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
|
}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
|
||||||
|
let aservice = agent_ttrpc::create_agent_service(Arc::new(agent_service));
|
||||||
let agent_worker = Arc::new(agent_service);
|
|
||||||
|
|
||||||
let health_service = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
|
let health_service = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
|
||||||
let health_worker = Arc::new(health_service);
|
let hservice = health_ttrpc::create_health(Arc::new(health_service));
|
||||||
|
|
||||||
let aservice = agent_ttrpc::create_agent_service(agent_worker);
|
|
||||||
|
|
||||||
let hservice = health_ttrpc::create_health(health_worker);
|
|
||||||
|
|
||||||
let server = TtrpcServer::new()
|
let server = TtrpcServer::new()
|
||||||
.bind(server_address)?
|
.bind(server_address)?
|
||||||
@ -1750,6 +1701,7 @@ fn update_container_namespaces(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update pid namespace
|
// update pid namespace
|
||||||
let mut pid_ns = LinuxNamespace {
|
let mut pid_ns = LinuxNamespace {
|
||||||
r#type: NSTYPEPID.to_string(),
|
r#type: NSTYPEPID.to_string(),
|
||||||
|
@ -3,14 +3,14 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
//
|
//
|
||||||
|
|
||||||
use crate::linux_abi::*;
|
use std::collections::HashMap;
|
||||||
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
|
use std::fs;
|
||||||
use crate::namespace::Namespace;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use crate::netlink::Handle;
|
use std::path::Path;
|
||||||
use crate::network::Network;
|
use std::str::FromStr;
|
||||||
use crate::pci;
|
use std::sync::Arc;
|
||||||
use crate::uevent::{Uevent, UeventMatcher};
|
use std::{thread, time};
|
||||||
use crate::watcher::BindWatcher;
|
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use kata_types::cpu::CpuSet;
|
use kata_types::cpu::CpuSet;
|
||||||
use libc::pid_t;
|
use libc::pid_t;
|
||||||
@ -22,18 +22,20 @@ use rustjail::container::BaseContainer;
|
|||||||
use rustjail::container::LinuxContainer;
|
use rustjail::container::LinuxContainer;
|
||||||
use rustjail::process::Process;
|
use rustjail::process::Process;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fs;
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::str::FromStr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::{thread, time};
|
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
|
|
||||||
|
use crate::linux_abi::*;
|
||||||
|
use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS};
|
||||||
|
use crate::namespace::Namespace;
|
||||||
|
use crate::netlink::Handle;
|
||||||
|
use crate::network::Network;
|
||||||
|
use crate::pci;
|
||||||
|
use crate::uevent::{Uevent, UeventMatcher};
|
||||||
|
use crate::watcher::BindWatcher;
|
||||||
|
|
||||||
pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
|
pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id";
|
||||||
|
|
||||||
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
|
type UeventWatcher = (Box<dyn UeventMatcher>, oneshot::Sender<Uevent>);
|
||||||
@ -103,9 +105,6 @@ impl Sandbox {
|
|||||||
// This method also returns a boolean to let
|
// This method also returns a boolean to let
|
||||||
// callers know if the storage already existed or not.
|
// callers know if the storage already existed or not.
|
||||||
// It will return true if storage is new.
|
// It will return true if storage is new.
|
||||||
//
|
|
||||||
// It's assumed that caller is calling this method after
|
|
||||||
// acquiring a lock on sandbox.
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub fn set_sandbox_storage(&mut self, path: &str) -> bool {
|
pub fn set_sandbox_storage(&mut self, path: &str) -> bool {
|
||||||
match self.storages.get_mut(path) {
|
match self.storages.get_mut(path) {
|
||||||
@ -126,16 +125,13 @@ impl Sandbox {
|
|||||||
// storage reference from the sandbox and return 'true' to
|
// storage reference from the sandbox and return 'true' to
|
||||||
// let the caller know that they can clean up the storage
|
// let the caller know that they can clean up the storage
|
||||||
// related directories by calling remove_sandbox_storage
|
// related directories by calling remove_sandbox_storage
|
||||||
//
|
|
||||||
// It's assumed that caller is calling this method after
|
|
||||||
// acquiring a lock on sandbox.
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> {
|
pub fn unset_sandbox_storage(&mut self, path: &str) -> Result<bool> {
|
||||||
match self.storages.get_mut(path) {
|
match self.storages.get_mut(path) {
|
||||||
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
|
None => Err(anyhow!("Sandbox storage with path {} not found", path)),
|
||||||
Some(count) => {
|
Some(count) => {
|
||||||
*count -= 1;
|
*count -= 1;
|
||||||
if *count < 1 {
|
if *count == 0 {
|
||||||
self.storages.remove(path);
|
self.storages.remove(path);
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
@ -146,11 +142,8 @@ impl Sandbox {
|
|||||||
|
|
||||||
// remove_sandbox_storage removes the sandbox storage if no
|
// remove_sandbox_storage removes the sandbox storage if no
|
||||||
// containers are using that storage.
|
// containers are using that storage.
|
||||||
//
|
|
||||||
// It's assumed that caller is calling this method after
|
|
||||||
// acquiring a lock on sandbox.
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub fn remove_sandbox_storage(&self, path: &str) -> Result<()> {
|
pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
|
||||||
let mounts = vec![path.to_string()];
|
let mounts = vec![path.to_string()];
|
||||||
remove_mounts(&mounts)?;
|
remove_mounts(&mounts)?;
|
||||||
// "remove_dir" will fail if the mount point is backed by a read-only filesystem.
|
// "remove_dir" will fail if the mount point is backed by a read-only filesystem.
|
||||||
@ -165,9 +158,6 @@ impl Sandbox {
|
|||||||
// unset_and_remove_sandbox_storage unsets the storage from sandbox
|
// unset_and_remove_sandbox_storage unsets the storage from sandbox
|
||||||
// and if there are no containers using this storage it will
|
// and if there are no containers using this storage it will
|
||||||
// remove it from the sandbox.
|
// remove it from the sandbox.
|
||||||
//
|
|
||||||
// It's assumed that caller is calling this method after
|
|
||||||
// acquiring a lock on sandbox.
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
|
pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> {
|
||||||
if self.unset_sandbox_storage(path)? {
|
if self.unset_sandbox_storage(path)? {
|
||||||
@ -184,22 +174,18 @@ impl Sandbox {
|
|||||||
.get_ipc()
|
.get_ipc()
|
||||||
.setup()
|
.setup()
|
||||||
.await
|
.await
|
||||||
.context("Failed to setup persistent IPC namespace")?;
|
.context("setup persistent IPC namespace")?;
|
||||||
|
|
||||||
// // Set up shared UTS namespace
|
// // Set up shared UTS namespace
|
||||||
self.shared_utsns = Namespace::new(&self.logger)
|
self.shared_utsns = Namespace::new(&self.logger)
|
||||||
.get_uts(self.hostname.as_str())
|
.get_uts(self.hostname.as_str())
|
||||||
.setup()
|
.setup()
|
||||||
.await
|
.await
|
||||||
.context("Failed to setup persistent UTS namespace")?;
|
.context("setup persistent UTS namespace")?;
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_container(&mut self, c: LinuxContainer) {
|
|
||||||
self.containers.insert(c.id.clone(), c);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> {
|
pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> {
|
||||||
// Populate the shared pid path only if this is an infra container and
|
// Populate the shared pid path only if this is an infra container and
|
||||||
@ -224,14 +210,18 @@ impl Sandbox {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_container(&mut self, c: LinuxContainer) {
|
||||||
|
self.containers.insert(c.id.clone(), c);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> {
|
pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> {
|
||||||
self.containers.get_mut(id)
|
self.containers.get_mut(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> {
|
pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> {
|
||||||
for (_, c) in self.containers.iter_mut() {
|
for (_, c) in self.containers.iter_mut() {
|
||||||
if c.processes.get(&pid).is_some() {
|
if let Some(p) = c.processes.get_mut(&pid) {
|
||||||
return c.processes.get_mut(&pid);
|
return Some(p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,25 +270,17 @@ impl Sandbox {
|
|||||||
let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?;
|
let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?;
|
||||||
|
|
||||||
for (_, ctr) in self.containers.iter() {
|
for (_, ctr) in self.containers.iter() {
|
||||||
let cpu = ctr
|
if let Some(spec) = ctr.config.spec.as_ref() {
|
||||||
.config
|
if let Some(linux) = spec.linux.as_ref() {
|
||||||
.spec
|
if let Some(resources) = linux.resources.as_ref() {
|
||||||
.as_ref()
|
if let Some(cpus) = resources.cpu.as_ref() {
|
||||||
.unwrap()
|
info!(self.logger, "updating {}", ctr.id.as_str());
|
||||||
.linux
|
ctr.cgroup_manager
|
||||||
.as_ref()
|
.update_cpuset_path(guest_cpuset.as_str(), &cpus.cpus)?;
|
||||||
.unwrap()
|
}
|
||||||
.resources
|
}
|
||||||
.as_ref()
|
}
|
||||||
.unwrap()
|
}
|
||||||
.cpu
|
|
||||||
.as_ref();
|
|
||||||
let container_cpust = if let Some(c) = cpu { &c.cpus } else { "" };
|
|
||||||
|
|
||||||
info!(self.logger, "updating {}", ctr.id.as_str());
|
|
||||||
ctr.cgroup_manager
|
|
||||||
.as_ref()
|
|
||||||
.update_cpuset_path(guest_cpuset.as_str(), container_cpust)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -360,31 +342,28 @@ impl Sandbox {
|
|||||||
#[instrument]
|
#[instrument]
|
||||||
pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) {
|
pub async fn run_oom_event_monitor(&self, mut rx: Receiver<String>, container_id: String) {
|
||||||
let logger = self.logger.clone();
|
let logger = self.logger.clone();
|
||||||
|
let tx = match self.event_tx.as_ref() {
|
||||||
if self.event_tx.is_none() {
|
Some(v) => v.clone(),
|
||||||
error!(
|
None => {
|
||||||
logger,
|
error!(
|
||||||
"sandbox.event_tx not found in run_oom_event_monitor"
|
logger,
|
||||||
);
|
"sandbox.event_tx not found in run_oom_event_monitor"
|
||||||
return;
|
);
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
let tx = self.event_tx.as_ref().unwrap().clone();
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
let event = rx.recv().await;
|
let event = rx.recv().await;
|
||||||
// None means the container has exited,
|
// None means the container has exited, and sender in OOM notifier is dropped.
|
||||||
// and sender in OOM notifier is dropped.
|
|
||||||
if event.is_none() {
|
if event.is_none() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
info!(logger, "got an OOM event {:?}", event);
|
info!(logger, "got an OOM event {:?}", event);
|
||||||
|
if let Err(e) = tx.send(container_id.clone()).await {
|
||||||
let _ = tx
|
error!(logger, "failed to send message: {:?}", e);
|
||||||
.send(container_id.clone())
|
}
|
||||||
.await
|
|
||||||
.map_err(|e| error!(logger, "failed to send message: {:?}", e));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -397,39 +376,36 @@ fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Res
|
|||||||
|
|
||||||
for e in fs::read_dir(path)? {
|
for e in fs::read_dir(path)? {
|
||||||
let entry = e?;
|
let entry = e?;
|
||||||
let tmpname = entry.file_name();
|
// Skip direntry which doesn't match the pattern.
|
||||||
let name = tmpname.to_str().unwrap();
|
match entry.file_name().to_str() {
|
||||||
let p = entry.path();
|
None => continue,
|
||||||
|
Some(v) => {
|
||||||
if re.is_match(name) {
|
if !re.is_match(v) {
|
||||||
let file = format!("{}/{}", p.to_str().unwrap(), SYSFS_ONLINE_FILE);
|
|
||||||
info!(logger, "{}", file.as_str());
|
|
||||||
|
|
||||||
let c = fs::read_to_string(file.as_str());
|
|
||||||
if c.is_err() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let c = c.unwrap();
|
|
||||||
|
|
||||||
if c.trim().contains('0') {
|
|
||||||
let r = fs::write(file.as_str(), "1");
|
|
||||||
if r.is_err() {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
count += 1;
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if num > 0 && count == num {
|
let p = entry.path().join(SYSFS_ONLINE_FILE);
|
||||||
|
if let Ok(c) = fs::read_to_string(&p) {
|
||||||
|
// Try to online the object in offline state.
|
||||||
|
if c.trim().contains('0') && fs::write(&p, "1").is_ok() && num > 0 {
|
||||||
|
count += 1;
|
||||||
|
if count == num {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if num > 0 {
|
Ok(count)
|
||||||
return Ok(count);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(0)
|
#[instrument]
|
||||||
|
fn online_memory(logger: &Logger) -> Result<()> {
|
||||||
|
online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)
|
||||||
|
.context("online memory resource")?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// max wait for all CPUs to online will use 50 * 100 = 5 seconds.
|
// max wait for all CPUs to online will use 50 * 100 = 5 seconds.
|
||||||
@ -473,13 +449,6 @@ fn online_cpus(logger: &Logger, num: i32) -> Result<i32> {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
|
||||||
fn online_memory(logger: &Logger) -> Result<()> {
|
|
||||||
online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)
|
|
||||||
.context("online memory resource")?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn onlined_cpus() -> Result<i32> {
|
fn onlined_cpus() -> Result<i32> {
|
||||||
let content =
|
let content =
|
||||||
fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?;
|
fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?;
|
||||||
@ -555,7 +524,7 @@ mod tests {
|
|||||||
skip_if_not_root!();
|
skip_if_not_root!();
|
||||||
|
|
||||||
let logger = slog::Logger::root(slog::Discard, o!());
|
let logger = slog::Logger::root(slog::Discard, o!());
|
||||||
let s = Sandbox::new(&logger).unwrap();
|
let mut s = Sandbox::new(&logger).unwrap();
|
||||||
|
|
||||||
let tmpdir = Builder::new().tempdir().unwrap();
|
let tmpdir = Builder::new().tempdir().unwrap();
|
||||||
let tmpdir_path = tmpdir.path().to_str().unwrap();
|
let tmpdir_path = tmpdir.path().to_str().unwrap();
|
||||||
|
Loading…
Reference in New Issue
Block a user