mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-09-17 14:58:16 +00:00
runtime-rs: Add guest swap support
This commit add guest swap support. When configuration enable_guest_swap is enabled, runtime-rs will start a swap task. When the VM start or update the guest memory, the swap task will be waked up to create and insert a swap file. Before this job, swap task will sleep some seconds (set by configuration guest_swap_create_threshold_secs) to reduce the impact on guest kernel boot performance and prevent the insertion of multiple swap files due to frequent memory elasticity within a short period. The size of swap file is set by configuration guest_swap_size_percent. The percentage of the total memory to be used as swap device. Fixes: #10988 Signed-off-by: Hui Zhu <teawater@antgroup.com>
This commit is contained in:
@@ -166,12 +166,6 @@ impl ConfigPlugin for DragonballConfig {
|
||||
return Err(eother!("dragonball hypervisor does not support pflashes"));
|
||||
}
|
||||
|
||||
if db.memory_info.enable_guest_swap {
|
||||
return Err(eother!(
|
||||
"dragonball hypervisor doesn't support enable_guest_swap"
|
||||
));
|
||||
}
|
||||
|
||||
if db.security_info.rootless {
|
||||
return Err(eother!(
|
||||
"dragonball hypervisor does not support rootless mode"
|
||||
|
@@ -710,16 +710,37 @@ pub struct MemoryInfo {
|
||||
|
||||
/// Enable swap in the guest. Default false.
|
||||
///
|
||||
/// When enable_guest_swap is enabled, insert a raw file to the guest as the swap device if the
|
||||
/// swappiness of a container (set by annotation "io.katacontainers.container.resource.swappiness")
|
||||
/// is bigger than 0.
|
||||
///
|
||||
/// The size of the swap device should be swap_in_bytes (set by annotation
|
||||
/// "io.katacontainers.container.resource.swap_in_bytes") - memory_limit_in_bytes.
|
||||
/// If swap_in_bytes is not set, the size should be memory_limit_in_bytes.
|
||||
/// If swap_in_bytes and memory_limit_in_bytes is not set, the size should be default_memory.
|
||||
/// When enable_guest_swap is enabled, insert a raw file to the guest as the swap device.
|
||||
#[serde(default)]
|
||||
pub enable_guest_swap: bool,
|
||||
|
||||
/// If enable_guest_swap is enabled, the swap device will be created in the guest
|
||||
/// at this path. Default "/run/kata-containers/swap".
|
||||
#[serde(default = "default_guest_swap_path")]
|
||||
pub guest_swap_path: String,
|
||||
|
||||
/// The percentage of the total memory to be used as swap device.
|
||||
/// Default 100.
|
||||
#[serde(default = "default_guest_swap_size_percent")]
|
||||
pub guest_swap_size_percent: u64,
|
||||
|
||||
/// The threshold in seconds to create swap device in the guest.
|
||||
/// Kata will wait guest_swap_create_threshold_secs seconds before creating swap device.
|
||||
/// Default 60.
|
||||
#[serde(default = "default_guest_swap_create_threshold_secs")]
|
||||
pub guest_swap_create_threshold_secs: u64,
|
||||
}
|
||||
|
||||
fn default_guest_swap_size_percent() -> u64 {
|
||||
100
|
||||
}
|
||||
|
||||
fn default_guest_swap_path() -> String {
|
||||
"/run/kata-containers/swap".to_string()
|
||||
}
|
||||
|
||||
fn default_guest_swap_create_threshold_secs() -> u64 {
|
||||
60
|
||||
}
|
||||
|
||||
impl MemoryInfo {
|
||||
|
@@ -149,6 +149,12 @@ impl ConfigPlugin for QemuConfig {
|
||||
MIN_QEMU_MEMORY_SIZE_MB
|
||||
));
|
||||
}
|
||||
|
||||
if qemu.memory_info.enable_guest_swap {
|
||||
return Err(eother!(
|
||||
"Qemu hypervisor doesn't support enable_guest_swap"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@@ -123,5 +123,6 @@ impl_agent!(
|
||||
resize_volume | crate::ResizeVolumeRequest | crate::Empty | None,
|
||||
online_cpu_mem | crate::OnlineCPUMemRequest | crate::Empty | None,
|
||||
get_metrics | crate::Empty | crate::MetricsResponse | None,
|
||||
get_guest_details | crate::GetGuestDetailsRequest | crate::GuestDetailsResponse | None
|
||||
get_guest_details | crate::GetGuestDetailsRequest | crate::GuestDetailsResponse | None,
|
||||
add_swap | crate::AddSwapRequest | crate::Empty | None
|
||||
);
|
||||
|
@@ -13,7 +13,7 @@ use protocols::{
|
||||
|
||||
use crate::{
|
||||
types::{
|
||||
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, AgentDetails, BlkioStats,
|
||||
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, AddSwapRequest, AgentDetails, BlkioStats,
|
||||
BlkioStatsEntry, CgroupStats, CheckRequest, CloseStdinRequest, ContainerID,
|
||||
CopyFileRequest, CpuStats, CpuUsage, CreateContainerRequest, CreateSandboxRequest, Device,
|
||||
Empty, ExecProcessRequest, FSGroup, FSGroupChangePolicy, GetIPTablesRequest,
|
||||
@@ -875,3 +875,12 @@ impl From<ResizeVolumeRequest> for agent::ResizeVolumeRequest {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AddSwapRequest> for agent::AddSwapRequest {
|
||||
fn from(from: AddSwapRequest) -> Self {
|
||||
Self {
|
||||
PCIPath: from.pci_path,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -14,18 +14,18 @@ mod log_forwarder;
|
||||
mod sock;
|
||||
pub mod types;
|
||||
pub use types::{
|
||||
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, BlkioStatsEntry, CheckRequest,
|
||||
CloseStdinRequest, ContainerID, ContainerProcessID, CopyFileRequest, CreateContainerRequest,
|
||||
CreateSandboxRequest, Empty, ExecProcessRequest, GetGuestDetailsRequest, GetIPTablesRequest,
|
||||
GetIPTablesResponse, GuestDetailsResponse, HealthCheckResponse, IPAddress, IPFamily, Interface,
|
||||
Interfaces, ListProcessesRequest, MemHotplugByProbeRequest, MetricsResponse,
|
||||
OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest, ReadStreamResponse,
|
||||
RemoveContainerRequest, ReseedRandomDevRequest, ResizeVolumeRequest, Route, Routes,
|
||||
SetGuestDateTimeRequest, SetIPTablesRequest, SetIPTablesResponse, SignalProcessRequest,
|
||||
StatsContainerResponse, Storage, TtyWinResizeRequest, UpdateContainerRequest,
|
||||
UpdateInterfaceRequest, UpdateRoutesRequest, VersionCheckResponse, VolumeStatsRequest,
|
||||
VolumeStatsResponse, WaitProcessRequest, WaitProcessResponse, WriteStreamRequest,
|
||||
WriteStreamResponse,
|
||||
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, AddSwapRequest, BlkioStatsEntry,
|
||||
CheckRequest, CloseStdinRequest, ContainerID, ContainerProcessID, CopyFileRequest,
|
||||
CreateContainerRequest, CreateSandboxRequest, Empty, ExecProcessRequest,
|
||||
GetGuestDetailsRequest, GetIPTablesRequest, GetIPTablesResponse, GuestDetailsResponse,
|
||||
HealthCheckResponse, IPAddress, IPFamily, Interface, Interfaces, ListProcessesRequest,
|
||||
MemHotplugByProbeRequest, MetricsResponse, OnlineCPUMemRequest, OomEventResponse,
|
||||
ReadStreamRequest, ReadStreamResponse, RemoveContainerRequest, ReseedRandomDevRequest,
|
||||
ResizeVolumeRequest, Route, Routes, SetGuestDateTimeRequest, SetIPTablesRequest,
|
||||
SetIPTablesResponse, SignalProcessRequest, StatsContainerResponse, Storage,
|
||||
TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest,
|
||||
VersionCheckResponse, VolumeStatsRequest, VolumeStatsResponse, WaitProcessRequest,
|
||||
WaitProcessResponse, WriteStreamRequest, WriteStreamResponse,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -94,4 +94,5 @@ pub trait Agent: AgentManager + HealthService + Send + Sync {
|
||||
async fn get_volume_stats(&self, req: VolumeStatsRequest) -> Result<VolumeStatsResponse>;
|
||||
async fn resize_volume(&self, req: ResizeVolumeRequest) -> Result<Empty>;
|
||||
async fn get_guest_details(&self, req: GetGuestDetailsRequest) -> Result<GuestDetailsResponse>;
|
||||
async fn add_swap(&self, req: AddSwapRequest) -> Result<Empty>;
|
||||
}
|
||||
|
@@ -604,6 +604,11 @@ pub struct VolumeStatsResponse {
|
||||
pub data: String,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Default, Debug)]
|
||||
pub struct AddSwapRequest {
|
||||
pub pci_path: Vec<u32>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::convert::TryFrom;
|
||||
|
@@ -46,11 +46,7 @@ impl MemResource {
|
||||
.await
|
||||
.context("update container memory resources")?;
|
||||
// the unit here is MB
|
||||
let mut mem_sb_mb = self
|
||||
.total_mems()
|
||||
.await
|
||||
.context("failed to calculate total memory requirement for containers")?;
|
||||
mem_sb_mb += self.orig_toml_default_mem;
|
||||
let mem_sb_mb = self.get_current_mb().await?;
|
||||
info!(sl!(), "calculate mem_sb_mb {}", mem_sb_mb);
|
||||
|
||||
let _curr_mem = self
|
||||
@@ -61,6 +57,16 @@ impl MemResource {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_current_mb(&self) -> Result<u32> {
|
||||
let mut mem_sb_mb = self
|
||||
.total_mems()
|
||||
.await
|
||||
.context("failed to calculate total memory requirement for containers")?;
|
||||
mem_sb_mb += self.orig_toml_default_mem;
|
||||
|
||||
Ok(mem_sb_mb)
|
||||
}
|
||||
|
||||
async fn total_mems(&self) -> Result<u32> {
|
||||
let mut mem_sandbox = 0;
|
||||
let resources = self.container_mem_resources.read().await;
|
||||
|
@@ -7,3 +7,4 @@
|
||||
pub mod cpu;
|
||||
pub mod initial_size;
|
||||
pub mod mem;
|
||||
pub mod swap;
|
||||
|
543
src/runtime-rs/crates/resource/src/cpu_mem/swap.rs
Normal file
543
src/runtime-rs/crates/resource/src/cpu_mem/swap.rs
Normal file
@@ -0,0 +1,543 @@
|
||||
// Copyright (c) 2025 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::{anyhow, Context, Error, Result};
|
||||
use hypervisor::{
|
||||
device::{
|
||||
device_manager::{do_handle_device, get_block_driver, DeviceManager},
|
||||
DeviceConfig, DeviceType,
|
||||
},
|
||||
BlockConfig,
|
||||
};
|
||||
use nix::sched::sched_yield;
|
||||
use nix::sys::statvfs::statvfs;
|
||||
use std::fmt;
|
||||
use std::fs::Permissions;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::cpu_mem::mem::MemResource;
|
||||
|
||||
const CHUNK_SIZE: usize = 1024 * 1024;
|
||||
const ERROR_WAIT_SECS: u64 = 120;
|
||||
const ONE_MB: usize = 1024 * 1024;
|
||||
const ERROR_RETRY_TIMES_MAX: usize = 2;
|
||||
|
||||
async fn check_disk_size(path: &Path, mut size: usize) -> Result<()> {
|
||||
let task_path = path.to_path_buf();
|
||||
|
||||
let stat = spawn_blocking(move || statvfs(&task_path))
|
||||
.await
|
||||
.context("spawn_blocking")?
|
||||
.context("statvfs")?;
|
||||
|
||||
let available_space = stat.blocks_available() * stat.block_size();
|
||||
|
||||
size += ONE_MB * 1024;
|
||||
|
||||
if available_space < size as u64 {
|
||||
Err(anyhow::anyhow!(
|
||||
"Not enough space on disk to create swap file {:?}",
|
||||
path.to_path_buf()
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_mkswap() -> Result<()> {
|
||||
Command::new("mkswap").arg("--help").output().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Core {
|
||||
current_swap_size: usize,
|
||||
next_swap_id: usize,
|
||||
stopped: bool,
|
||||
}
|
||||
|
||||
impl Core {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
current_swap_size: 0,
|
||||
next_swap_id: 0,
|
||||
stopped: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_next_swap_id(&mut self) {
|
||||
self.next_swap_id += 1;
|
||||
}
|
||||
|
||||
fn plus_swap_size(&mut self, size: usize) {
|
||||
self.current_swap_size += size;
|
||||
}
|
||||
|
||||
fn stop(&mut self) {
|
||||
self.stopped = true;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SwapTask {
|
||||
path: PathBuf,
|
||||
size_percent: usize,
|
||||
create_threshold_secs: u64,
|
||||
core: Arc<Mutex<Core>>,
|
||||
wake_rx: Arc<Mutex<mpsc::Receiver<()>>>,
|
||||
mem: MemResource,
|
||||
agent: Arc<dyn Agent>,
|
||||
device_manager: Arc<RwLock<DeviceManager>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct StStop {
|
||||
need_remove: bool,
|
||||
}
|
||||
impl StStop {
|
||||
fn get_error(need_remove: bool) -> Error {
|
||||
Self { need_remove }.into()
|
||||
}
|
||||
}
|
||||
impl fmt::Display for StStop {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "swap_task stop. Need remove {}", self.need_remove)
|
||||
}
|
||||
}
|
||||
impl std::error::Error for StStop {}
|
||||
|
||||
impl SwapTask {
|
||||
// Return true if need remove runtime_path.join(format!("swap{}", core.lock().unwrap().next_swap_id))
|
||||
async fn run(&mut self) -> Result<()> {
|
||||
sleep(Duration::from_secs(self.create_threshold_secs)).await;
|
||||
|
||||
if self.should_stop(true).await {
|
||||
return Err(StStop::get_error(false));
|
||||
}
|
||||
|
||||
let current_size = self.mem.get_current_mb().await.context("get_current_mb")? as usize
|
||||
* ONE_MB
|
||||
* self.size_percent
|
||||
/ 100;
|
||||
let current_swap_size = { self.core.lock().await.current_swap_size };
|
||||
|
||||
if current_size <= current_swap_size {
|
||||
debug!(
|
||||
sl!(),
|
||||
"swap_task: current memory {} current swap {}, stop",
|
||||
current_size,
|
||||
current_swap_size
|
||||
);
|
||||
return Err(StStop::get_error(false));
|
||||
}
|
||||
|
||||
let swap_size = current_size - current_swap_size;
|
||||
let swap_path = self.get_swap_path().await;
|
||||
|
||||
self.create_disk(swap_size, &swap_path).await?;
|
||||
|
||||
let swap_path = swap_path.to_string_lossy().to_string();
|
||||
|
||||
// Add swap file to sandbox
|
||||
let block_driver = get_block_driver(&self.device_manager).await;
|
||||
let dev_info = DeviceConfig::BlockCfg(BlockConfig {
|
||||
path_on_host: swap_path.clone(),
|
||||
driver_option: block_driver,
|
||||
is_direct: Some(true),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
if self.should_stop(false).await {
|
||||
return Err(StStop::get_error(true));
|
||||
}
|
||||
|
||||
let device_info = do_handle_device(&self.device_manager.clone(), &dev_info)
|
||||
.await
|
||||
.context("do_handle_device")?;
|
||||
let device_id = match device_info {
|
||||
DeviceType::Block(ref bdev) => bdev.device_id.clone(),
|
||||
_ => return Err(anyhow!("swap_task: device_info {} is not Block", swap_path)),
|
||||
};
|
||||
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
|
||||
if self.should_stop(false).await {
|
||||
if let Err(e1) = self
|
||||
.device_manager
|
||||
.write()
|
||||
.await
|
||||
.try_remove_device(&device_id)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
sl!(),
|
||||
"swap_task: try_remove_device {} fail: {:?}", swap_path, e1
|
||||
);
|
||||
}
|
||||
|
||||
return Err(StStop::get_error(true));
|
||||
}
|
||||
|
||||
if let DeviceType::Block(device) = device_info {
|
||||
if let Some(pci_path) = device.config.pci_path.clone() {
|
||||
if let Err(e) = self
|
||||
.agent
|
||||
.add_swap(agent::types::AddSwapRequest {
|
||||
pci_path: pci_path.slots.iter().map(|slot| slot.0 as u32).collect(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
if let Err(e1) = self
|
||||
.device_manager
|
||||
.write()
|
||||
.await
|
||||
.try_remove_device(&device_id)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
sl!(),
|
||||
"swap_task: try_remove_device {} fail: {:?}", swap_path, e1
|
||||
);
|
||||
}
|
||||
|
||||
return Err(anyhow!("swap_task: agent.swap_add failed: {:?}", e));
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow!(
|
||||
"swap_task: device_info {} pci_path is None",
|
||||
swap_path
|
||||
));
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow!("swap_task: device_info {} is not Block", swap_path));
|
||||
}
|
||||
|
||||
let mut core_lock = self.core.lock().await;
|
||||
core_lock.update_next_swap_id();
|
||||
core_lock.plus_swap_size(swap_size);
|
||||
|
||||
info!(
|
||||
sl!(),
|
||||
"swap_task: swap file {:?} {} inserted", swap_path, swap_size
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_disk(&mut self, swap_size: usize, swap_path: &PathBuf) -> Result<()> {
|
||||
debug!(
|
||||
sl!(),
|
||||
"swap_task: create_disk {:?} {} start", swap_path, swap_size
|
||||
);
|
||||
|
||||
check_disk_size(&self.path, swap_size)
|
||||
.await
|
||||
.context("check_disk_size")?;
|
||||
|
||||
debug!(
|
||||
sl!(),
|
||||
"swap_task: create swap file {:?} {}", swap_path, swap_size
|
||||
);
|
||||
let mut file = File::create(swap_path)
|
||||
.await
|
||||
.context(format!("swap: File::create {:?}", swap_path))?;
|
||||
fs::set_permissions(swap_path, Permissions::from_mode(0o700))
|
||||
.await
|
||||
.context(format!("swap: File::set_permissions {:?}", swap_path))?;
|
||||
|
||||
let buffer = vec![0; CHUNK_SIZE];
|
||||
let mut total_written = 0;
|
||||
while total_written < swap_size {
|
||||
spawn_blocking(sched_yield)
|
||||
.await
|
||||
.context("swap_task: task::spawn_blocking")?
|
||||
.context("swap_task: sched_yield")?;
|
||||
|
||||
if self.should_stop(false).await {
|
||||
return Err(StStop::get_error(true));
|
||||
}
|
||||
|
||||
let remaining = swap_size - total_written;
|
||||
let write_size = std::cmp::min(remaining, CHUNK_SIZE);
|
||||
file.write_all(&buffer[..write_size])
|
||||
.await
|
||||
.context("file.write_all")?;
|
||||
total_written += write_size;
|
||||
}
|
||||
|
||||
if self.should_stop(false).await {
|
||||
return Err(StStop::get_error(true));
|
||||
}
|
||||
|
||||
file.flush().await.context("file.flush")?;
|
||||
drop(file);
|
||||
|
||||
if self.should_stop(false).await {
|
||||
return Err(StStop::get_error(true));
|
||||
}
|
||||
|
||||
let output = Command::new("mkswap")
|
||||
.arg(swap_path)
|
||||
.output()
|
||||
.await
|
||||
.context("mkswap command")?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"mkswap command fail: {}",
|
||||
String::from_utf8(output.stdout).unwrap_or_default()
|
||||
));
|
||||
}
|
||||
|
||||
debug!(
|
||||
sl!(),
|
||||
"swap_task: created swap file {:?} {}", swap_path, swap_size
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn should_stop(&self, clear_wake: bool) -> bool {
|
||||
if clear_wake {
|
||||
if let Err(e) = self.wake_rx.lock().await.try_recv() {
|
||||
error!(
|
||||
sl!(),
|
||||
"swap_task: should_keep_run wake_rx.try_recv() {:?}", e
|
||||
);
|
||||
}
|
||||
}
|
||||
self.core.lock().await.stopped
|
||||
}
|
||||
|
||||
// Return true if thread should stop
|
||||
async fn wait_wake(&self) -> bool {
|
||||
{
|
||||
if self.core.lock().await.stopped {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
if self.wake_rx.lock().await.recv().await.is_none() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
self.core.lock().await.stopped
|
||||
}
|
||||
|
||||
async fn get_swap_path(&self) -> PathBuf {
|
||||
let id = self.core.lock().await.next_swap_id;
|
||||
self.path.join(format!("swap{}", id))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct SwapResourceInner {
|
||||
wake_tx: mpsc::Sender<()>,
|
||||
core: Arc<Mutex<Core>>,
|
||||
swap_task_handle: Arc<Mutex<task::JoinHandle<()>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SwapResource {
|
||||
runtime_path: PathBuf,
|
||||
inner: Option<SwapResourceInner>,
|
||||
}
|
||||
|
||||
impl SwapResource {
|
||||
async fn new_inner(
|
||||
core: Arc<Mutex<Core>>,
|
||||
runtime_path: PathBuf,
|
||||
size_percent: u64,
|
||||
create_threshold_secs: u64,
|
||||
mem: MemResource,
|
||||
agent: Arc<dyn Agent>,
|
||||
device_manager: Arc<RwLock<DeviceManager>>,
|
||||
) -> Result<Self> {
|
||||
check_mkswap().await.context("check_mkswap")?;
|
||||
|
||||
fs::create_dir_all(&runtime_path)
|
||||
.await
|
||||
.context(format!("fs::create_dir_all {:?}", &runtime_path))?;
|
||||
match fs::set_permissions(&runtime_path, Permissions::from_mode(0o700)).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
if let Err(e2) = fs::remove_dir_all(&runtime_path).await {
|
||||
error!(
|
||||
sl!(),
|
||||
"swap: fs::remove_dir_all {:?} fail: {:?}", &runtime_path, e2
|
||||
);
|
||||
}
|
||||
Err(anyhow!(
|
||||
"swap: set_permissions {:?} failed: {:?}",
|
||||
&runtime_path,
|
||||
e
|
||||
))
|
||||
}
|
||||
}?;
|
||||
|
||||
let (wake_tx, wake_rx) = mpsc::channel(1);
|
||||
|
||||
let mut st = SwapTask {
|
||||
path: runtime_path.clone(),
|
||||
size_percent: size_percent as usize,
|
||||
create_threshold_secs,
|
||||
core: core.clone(),
|
||||
wake_rx: Arc::new(Mutex::new(wake_rx)),
|
||||
mem,
|
||||
agent,
|
||||
device_manager,
|
||||
};
|
||||
let swap_task_handle = task::spawn(async move {
|
||||
info!(sl!(), "swap_task {:?} start", st.path);
|
||||
|
||||
loop {
|
||||
if st.wait_wake().await {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut error_retry_times = 0;
|
||||
let mut keep_run = true;
|
||||
|
||||
while keep_run {
|
||||
keep_run = false;
|
||||
|
||||
let need_remove = match st.run().await {
|
||||
Ok(_) => false,
|
||||
Err(e) => {
|
||||
error!(sl!(), "swap_task in {:?} fail: {:?}", st.path, e);
|
||||
if let Some(custom_error) = e.downcast_ref::<StStop>() {
|
||||
custom_error.need_remove
|
||||
} else {
|
||||
keep_run = true;
|
||||
true
|
||||
}
|
||||
}
|
||||
};
|
||||
debug!(sl!(), "swap_task: run stop");
|
||||
|
||||
if need_remove {
|
||||
let swap_path = st.get_swap_path().await;
|
||||
if swap_path.exists() {
|
||||
if let Err(e) = fs::remove_file(&swap_path).await {
|
||||
error!(
|
||||
sl!(),
|
||||
"swap_task error handler remove_file {:?} fail: {:?}",
|
||||
swap_path,
|
||||
e
|
||||
);
|
||||
st.core.lock().await.update_next_swap_id();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if keep_run {
|
||||
error_retry_times += 1;
|
||||
if error_retry_times > ERROR_RETRY_TIMES_MAX {
|
||||
error!(sl!(), "swap_task {:?} error retry times exceed", st.path);
|
||||
keep_run = false;
|
||||
} else {
|
||||
sleep(Duration::from_secs(ERROR_WAIT_SECS)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(sl!(), "swap_task {:?} stop", st.path);
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
runtime_path,
|
||||
inner: Some(SwapResourceInner {
|
||||
wake_tx,
|
||||
core,
|
||||
swap_task_handle: Arc::new(Mutex::new(swap_task_handle)),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn new(
|
||||
runtime_path: PathBuf,
|
||||
size_percent: u64,
|
||||
create_threshold_secs: u64,
|
||||
mem: MemResource,
|
||||
agent: Arc<dyn Agent>,
|
||||
device_manager: Arc<RwLock<DeviceManager>>,
|
||||
) -> Result<Self> {
|
||||
let core = Arc::new(Mutex::new(Core::new()));
|
||||
Self::new_inner(
|
||||
core,
|
||||
runtime_path,
|
||||
size_percent,
|
||||
create_threshold_secs,
|
||||
mem,
|
||||
agent,
|
||||
device_manager,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn restore(runtime_path: PathBuf) -> Self {
|
||||
Self {
|
||||
runtime_path,
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn wakeup_thread(&self) {
|
||||
if let Some(inner) = &self.inner {
|
||||
if let Err(e) = inner.wake_tx.try_send(()) {
|
||||
error!(sl!(), "swap wakeup_thread wake_tx try_send fail: {:?}", e);
|
||||
}
|
||||
} else {
|
||||
error!(sl!(), "swap wakeup_thread no inner");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update(&self) {
|
||||
self.wakeup_thread();
|
||||
}
|
||||
|
||||
async fn stop(&self) {
|
||||
if let Some(inner) = &self.inner {
|
||||
inner.core.lock().await.stop();
|
||||
}
|
||||
|
||||
self.wakeup_thread();
|
||||
|
||||
if let Some(inner) = &self.inner {
|
||||
let mut handle = inner.swap_task_handle.lock().await;
|
||||
let join_handle = std::mem::replace(&mut *handle, task::spawn(async {}));
|
||||
join_handle.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn clean(&self) {
|
||||
self.stop().await;
|
||||
|
||||
if let Err(e) = fs::remove_dir_all(&self.runtime_path).await {
|
||||
error!(
|
||||
sl!(),
|
||||
"swap fs::remove_dir_all {:?} fail: {:?}", self.runtime_path, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
@@ -22,12 +22,15 @@ use kata_types::mount::Mount;
|
||||
use oci::{Linux, LinuxCpu, LinuxResources};
|
||||
use oci_spec::runtime::{self as oci, LinuxDeviceType};
|
||||
use persist::sandbox_persist::Persist;
|
||||
use std::path::PathBuf;
|
||||
use tokio::{runtime, sync::RwLock};
|
||||
|
||||
use crate::{
|
||||
cdi_devices::{sort_options_by_pcipath, ContainerDevice, DeviceInfo},
|
||||
cgroups::{CgroupArgs, CgroupsResource},
|
||||
cpu_mem::{cpu::CpuResource, initial_size::InitialSizeManager, mem::MemResource},
|
||||
cpu_mem::{
|
||||
cpu::CpuResource, initial_size::InitialSizeManager, mem::MemResource, swap::SwapResource,
|
||||
},
|
||||
manager::ManagerArgs,
|
||||
network::{self, Network, NetworkConfig},
|
||||
resource_persist::ResourceState,
|
||||
@@ -51,6 +54,7 @@ pub(crate) struct ResourceManagerInner {
|
||||
pub cgroups_resource: CgroupsResource,
|
||||
pub cpu_resource: CpuResource,
|
||||
pub mem_resource: MemResource,
|
||||
pub swap_resource: Option<SwapResource>,
|
||||
}
|
||||
|
||||
impl ResourceManagerInner {
|
||||
@@ -66,16 +70,53 @@ impl ResourceManagerInner {
|
||||
let dev_manager = DeviceManager::new(hypervisor.clone(), topo_config.as_ref())
|
||||
.await
|
||||
.context("failed to create device manager")?;
|
||||
let device_manager = Arc::new(RwLock::new(dev_manager));
|
||||
|
||||
let cgroups_resource = CgroupsResource::new(sid, &toml_config)?;
|
||||
let cpu_resource = CpuResource::new(toml_config.clone())?;
|
||||
let mem_resource = MemResource::new(init_size_manager)?;
|
||||
let swap_resource = if hypervisor
|
||||
.hypervisor_config()
|
||||
.await
|
||||
.memory_info
|
||||
.enable_guest_swap
|
||||
{
|
||||
let mut path = PathBuf::from(
|
||||
hypervisor
|
||||
.hypervisor_config()
|
||||
.await
|
||||
.memory_info
|
||||
.guest_swap_path,
|
||||
);
|
||||
path.push(sid);
|
||||
Some(
|
||||
SwapResource::new(
|
||||
path,
|
||||
hypervisor
|
||||
.hypervisor_config()
|
||||
.await
|
||||
.memory_info
|
||||
.guest_swap_size_percent,
|
||||
hypervisor
|
||||
.hypervisor_config()
|
||||
.await
|
||||
.memory_info
|
||||
.guest_swap_create_threshold_secs,
|
||||
mem_resource.clone(),
|
||||
agent.clone(),
|
||||
device_manager.clone(),
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(Self {
|
||||
sid: sid.to_string(),
|
||||
toml_config,
|
||||
agent,
|
||||
hypervisor,
|
||||
device_manager: Arc::new(RwLock::new(dev_manager)),
|
||||
device_manager,
|
||||
network: None,
|
||||
share_fs: None,
|
||||
rootfs_resource: RootFsResource::new(),
|
||||
@@ -83,6 +124,7 @@ impl ResourceManagerInner {
|
||||
cgroups_resource,
|
||||
cpu_resource,
|
||||
mem_resource,
|
||||
swap_resource,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -248,6 +290,10 @@ impl ResourceManagerInner {
|
||||
self.handle_routes(network).await.context("handle routes")?;
|
||||
}
|
||||
|
||||
if let Some(swap) = self.swap_resource.as_ref() {
|
||||
swap.update().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -444,6 +490,11 @@ impl ResourceManagerInner {
|
||||
.await
|
||||
.context("failed to cleanup host path")?;
|
||||
}
|
||||
|
||||
if let Some(swap) = self.swap_resource.as_ref() {
|
||||
swap.clean().await;
|
||||
}
|
||||
|
||||
// TODO cleanup other resources
|
||||
Ok(())
|
||||
}
|
||||
@@ -487,6 +538,10 @@ impl ResourceManagerInner {
|
||||
.update_cgroups(cid, linux_resources, op, self.hypervisor.as_ref())
|
||||
.await?;
|
||||
|
||||
if let Some(swap) = self.swap_resource.as_ref() {
|
||||
swap.update().await;
|
||||
}
|
||||
|
||||
// update the linux resources for agent
|
||||
self.agent_linux_resources(linux_resources)
|
||||
}
|
||||
@@ -544,13 +599,37 @@ impl Persist for ResourceManagerInner {
|
||||
};
|
||||
let topo_config = TopologyConfigInfo::new(&args.config);
|
||||
|
||||
let mem_resource = MemResource::default();
|
||||
let device_manager = Arc::new(RwLock::new(
|
||||
DeviceManager::new(resource_args.hypervisor.clone(), topo_config.as_ref()).await?,
|
||||
));
|
||||
|
||||
let swap_resource = if resource_args
|
||||
.hypervisor
|
||||
.hypervisor_config()
|
||||
.await
|
||||
.memory_info
|
||||
.enable_guest_swap
|
||||
{
|
||||
let mut path = PathBuf::from(
|
||||
resource_args
|
||||
.hypervisor
|
||||
.hypervisor_config()
|
||||
.await
|
||||
.memory_info
|
||||
.guest_swap_path,
|
||||
);
|
||||
path.push(resource_args.sid.clone());
|
||||
Some(SwapResource::restore(path).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
sid: resource_args.sid,
|
||||
agent: resource_args.agent,
|
||||
hypervisor: resource_args.hypervisor.clone(),
|
||||
device_manager: Arc::new(RwLock::new(
|
||||
DeviceManager::new(resource_args.hypervisor, topo_config.as_ref()).await?,
|
||||
)),
|
||||
hypervisor: resource_args.hypervisor,
|
||||
device_manager,
|
||||
network: None,
|
||||
share_fs: None,
|
||||
rootfs_resource: RootFsResource::new(),
|
||||
@@ -562,7 +641,8 @@ impl Persist for ResourceManagerInner {
|
||||
.await?,
|
||||
toml_config: Arc::new(TomlConfig::default()),
|
||||
cpu_resource: CpuResource::default(),
|
||||
mem_resource: MemResource::default(),
|
||||
mem_resource,
|
||||
swap_resource,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user