feat(runtime-rs): support vcpu resizing on runtime side

Support vcpu resizing on runtime side:
1. Calculate vcpu numbers in resource_manager using all the containers'
   linux_resources in the spec.
2. Call the hypervisor(vmm) to do the vcpu resize.
3. Call the agent to online vcpus.

Fixes: #5030
Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
Signed-off-by: Yushuo <y-shuo@linux.alibaba.com>
This commit is contained in:
Ji-Xinyou 2023-02-15 22:02:09 +08:00 committed by Yushuo
parent 2988553305
commit fa6dff9f70
23 changed files with 348 additions and 28 deletions

View File

@ -26,7 +26,7 @@ pub enum Error {
}
/// Assigned CPU resources for a Linux container.
#[derive(Default, Debug)]
#[derive(Clone, Default, Debug)]
pub struct LinuxContainerCpuResources {
shares: u64,
period: u64,

View File

@ -13,7 +13,7 @@ use crate::Error;
///
/// The `U32Set` may be used to save CPUs parsed from a CPU list file or NUMA nodes parsed from
/// a NUMA node list file.
#[derive(Default, Debug)]
#[derive(Clone, Default, Debug)]
pub struct U32Set(Vec<u32>);
impl U32Set {

View File

@ -3,3 +3,4 @@ members = [
"crates/shim",
"crates/shim-ctl",
]

View File

@ -117,5 +117,6 @@ impl_agent!(
get_ip_tables | crate::GetIPTablesRequest | crate::GetIPTablesResponse | None,
set_ip_tables | crate::SetIPTablesRequest | crate::SetIPTablesResponse | None,
get_volume_stats | crate::VolumeStatsRequest | crate::VolumeStatsResponse | None,
resize_volume | crate::ResizeVolumeRequest | crate::Empty | None
resize_volume | crate::ResizeVolumeRequest | crate::Empty | None,
online_cpu_mem | crate::OnlineCPUMemRequest | crate::Empty | None
);

View File

@ -54,6 +54,7 @@ pub trait Agent: AgentManager + HealthService + Send + Sync {
// sandbox
async fn create_sandbox(&self, req: CreateSandboxRequest) -> Result<Empty>;
async fn destroy_sandbox(&self, req: Empty) -> Result<Empty>;
async fn online_cpu_mem(&self, req: OnlineCPUMemRequest) -> Result<Empty>;
// network
async fn add_arp_neighbors(&self, req: AddArpNeighborRequest) -> Result<Empty>;

View File

@ -32,7 +32,7 @@ kata-types = { path = "../../../libs/kata-types" }
logging = { path = "../../../libs/logging" }
shim-interface = { path = "../../../libs/shim-interface" }
dragonball = { path = "../../../dragonball", features = ["atomic-guest-memory", "virtio-vsock", "hotplug", "virtio-blk", "virtio-net", "virtio-fs","dbs-upcall"] }
dragonball = { path = "../../../dragonball", features = ["atomic-guest-memory", "virtio-vsock", "hotplug", "virtio-blk", "virtio-net", "virtio-fs", "dbs-upcall"] }
ch-config = { path = "ch-config", optional = true }

View File

@ -13,7 +13,7 @@ use crate::{
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use dragonball::{
api::v1::{BlockDeviceConfigInfo, BootSourceConfig},
api::v1::{BlockDeviceConfigInfo, BootSourceConfig, VcpuResizeInfo},
vm::VmConfigInfo,
};
use kata_sys_util::mount;
@ -327,6 +327,53 @@ impl DragonballInner {
}
}
// check if resizing info is valid
// the error in this function is not ok to be tolerated, the container boot will fail
fn precheck_resize_vcpus(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> {
// old_vcpus > 0, safe for conversion
let current_vcpus = old_vcpus as u32;
// a non-zero positive is required
if new_vcpus == 0 {
return Err(anyhow!("resize vcpu error: 0 vcpu resizing is invalid"));
}
// cannot exceed maximum value
if new_vcpus > self.config.cpu_info.default_maxvcpus {
return Err(anyhow!("resize vcpu error: cannot greater than maxvcpus"));
}
Ok((current_vcpus, new_vcpus))
}
// do the check before resizing, returns Result<(old, new)>
pub(crate) async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> {
if old_vcpus == new_vcpus {
info!(
sl!(),
"resize_vcpu: no need to resize vcpus because old_vcpus is equal to new_vcpus"
);
return Ok((new_vcpus, new_vcpus));
}
let (old_vcpus, new_vcpus) = self.precheck_resize_vcpus(old_vcpus, new_vcpus)?;
info!(
sl!(),
"check_resize_vcpus passed, passing new_vcpus = {:?} to vmm", new_vcpus
);
let cpu_resize_info = VcpuResizeInfo {
vcpu_count: Some(new_vcpus as u8),
};
self.vmm_instance
.resize_vcpu(&cpu_resize_info)
.context(format!(
"failed to do_resize_vcpus on new_vcpus={:?}",
new_vcpus
))?;
Ok((old_vcpus, new_vcpus))
}
pub fn set_hypervisor_config(&mut self, config: HypervisorConfig) {
self.config = config;
}

View File

@ -77,6 +77,12 @@ impl Hypervisor for Dragonball {
inner.save_vm().await
}
// returns Result<(old_vcpus, new_vcpus)>
async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> {
let inner = self.inner.read().await;
inner.resize_vcpu(old_vcpus, new_vcpus).await
}
async fn add_device(&self, device: DeviceType) -> Result<()> {
let mut inner = self.inner.write().await;
inner.add_device(device).await

View File

@ -16,8 +16,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use dragonball::{
api::v1::{
BlockDeviceConfigInfo, BootSourceConfig, FsDeviceConfigInfo, FsMountConfigInfo,
InstanceInfo, InstanceState, VirtioNetDeviceConfigInfo, VmmAction, VmmActionError, VmmData,
VmmRequest, VmmResponse, VmmService, VsockDeviceConfigInfo,
InstanceInfo, InstanceState, VcpuResizeInfo, VirtioNetDeviceConfigInfo, VmmAction,
VmmActionError, VmmData, VmmRequest, VmmResponse, VmmService, VsockDeviceConfigInfo,
},
vm::VmConfigInfo,
Vmm,
@ -248,6 +248,12 @@ impl VmmInstance {
Ok(())
}
pub fn resize_vcpu(&self, cfg: &VcpuResizeInfo) -> Result<()> {
self.handle_request(Request::Sync(VmmAction::ResizeVcpu(cfg.clone())))
.with_context(|| format!("Failed to resize_vm(hotplug vcpu), cfg: {:?}", cfg))?;
Ok(())
}
pub fn pause(&self) -> Result<()> {
todo!()
}

View File

@ -78,6 +78,7 @@ pub trait Hypervisor: Send + Sync {
async fn pause_vm(&self) -> Result<()>;
async fn save_vm(&self) -> Result<()>;
async fn resume_vm(&self) -> Result<()>;
async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)>; // returns (old_vcpus, new_vcpus)
// device manager
async fn add_device(&self, device: DeviceType) -> Result<()>;

View File

@ -104,6 +104,11 @@ impl QemuInner {
todo!()
}
pub(crate) async fn resize_vcpu(&self, _old_vcpus: u32, _new_vcpus: u32) -> Result<(u32, u32)> {
info!(sl!(), "QemuInner::resize_vcpu()");
todo!()
}
pub(crate) async fn get_pids(&self) -> Result<Vec<u32>> {
info!(sl!(), "QemuInner::get_pids()");
todo!()

View File

@ -118,6 +118,11 @@ impl Hypervisor for Qemu {
inner.cleanup().await
}
async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> {
let inner = self.inner.read().await;
inner.resize_vcpu(old_vcpus, new_vcpus).await
}
async fn get_pids(&self) -> Result<Vec<u32>> {
let inner = self.inner.read().await;
inner.get_pids().await

View File

@ -0,0 +1,188 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
cmp,
collections::{HashMap, HashSet},
convert::TryFrom,
sync::Arc,
};
use agent::{Agent, OnlineCPUMemRequest};
use anyhow::{Context, Ok, Result};
use hypervisor::Hypervisor;
use kata_types::{config::TomlConfig, cpu::LinuxContainerCpuResources};
use oci::LinuxCpu;
use tokio::sync::RwLock;
use crate::ResourceUpdateOp;
#[derive(Default, Debug, Clone)]
pub struct CpuResource {
/// Current number of vCPUs
pub(crate) current_vcpu: Arc<RwLock<u32>>,
/// Default number of vCPUs
pub(crate) default_vcpu: u32,
/// CpuResource of each container
pub(crate) container_cpu_resources: Arc<RwLock<HashMap<String, LinuxContainerCpuResources>>>,
}
impl CpuResource {
pub fn new(config: Arc<TomlConfig>) -> Result<Self> {
let hypervisor_name = config.runtime.hypervisor_name.clone();
let hypervisor_config = config
.hypervisor
.get(&hypervisor_name)
.context(format!("failed to get hypervisor {}", hypervisor_name))?;
Ok(Self {
current_vcpu: Arc::new(RwLock::new(hypervisor_config.cpu_info.default_vcpus as u32)),
default_vcpu: hypervisor_config.cpu_info.default_vcpus as u32,
container_cpu_resources: Arc::new(RwLock::new(HashMap::new())),
})
}
pub(crate) async fn update_cpu_resources(
&self,
cid: &str,
linux_cpus: Option<&LinuxCpu>,
op: ResourceUpdateOp,
hypervisor: &dyn Hypervisor,
agent: &dyn Agent,
) -> Result<()> {
self.update_container_cpu_resources(cid, linux_cpus, op)
.await
.context("update container cpu resources")?;
let vcpu_required = self
.calc_cpu_resources()
.await
.context("calculate vcpus required")?;
if vcpu_required == self.current_vcpu().await {
return Ok(());
}
let curr_vcpus = self
.do_update_cpu_resources(vcpu_required, op, hypervisor, agent)
.await?;
self.update_current_vcpu(curr_vcpus).await;
Ok(())
}
async fn current_vcpu(&self) -> u32 {
let current_vcpu = self.current_vcpu.read().await;
*current_vcpu
}
async fn update_current_vcpu(&self, new_vcpus: u32) {
let mut current_vcpu = self.current_vcpu.write().await;
*current_vcpu = new_vcpus;
}
// update container_cpu_resources field
async fn update_container_cpu_resources(
&self,
cid: &str,
linux_cpus: Option<&LinuxCpu>,
op: ResourceUpdateOp,
) -> Result<()> {
if let Some(cpu) = linux_cpus {
let container_resource = LinuxContainerCpuResources::try_from(cpu)?;
let mut resources = self.container_cpu_resources.write().await;
match op {
ResourceUpdateOp::Add => {
resources.insert(cid.to_owned(), container_resource);
}
ResourceUpdateOp::Update => {
let resource = resources.insert(cid.to_owned(), container_resource.clone());
if let Some(old_container_resource) = resource {
// the priority of cpu-quota is higher than cpuset when determine the number of vcpus.
// we should better ignore the resource update when update cpu only by cpuset if cpu-quota
// has been set previously.
if old_container_resource.quota() > 0 && container_resource.quota() < 0 {
resources.insert(cid.to_owned(), old_container_resource);
}
}
}
ResourceUpdateOp::Del => {
resources.remove(cid);
}
}
}
Ok(())
}
// calculates the total required vcpus by adding each container's requirements within the pod
async fn calc_cpu_resources(&self) -> Result<u32> {
let mut total_vcpu = 0;
let mut cpuset_vcpu: HashSet<u32> = HashSet::new();
let resources = self.container_cpu_resources.read().await;
for (_, cpu_resource) in resources.iter() {
let vcpu = cpu_resource.get_vcpus().unwrap_or(0) as u32;
cpuset_vcpu.extend(cpu_resource.cpuset().iter());
total_vcpu += vcpu;
}
// contrained only by cpuset
if total_vcpu == 0 && !cpuset_vcpu.is_empty() {
info!(sl!(), "(from cpuset)get vcpus # {:?}", cpuset_vcpu);
return Ok(cpuset_vcpu.len() as u32);
}
info!(
sl!(),
"(from cfs_quota&cfs_period)get vcpus count {}", total_vcpu
);
Ok(total_vcpu)
}
// do hotplug and hot-unplug the vcpu
async fn do_update_cpu_resources(
&self,
new_vcpus: u32,
op: ResourceUpdateOp,
hypervisor: &dyn Hypervisor,
agent: &dyn Agent,
) -> Result<u32> {
let old_vcpus = self.current_vcpu().await;
// when adding vcpus, ignore old_vcpus > new_vcpus
// when deleting vcpus, ignore old_vcpus < new_vcpus
if (op == ResourceUpdateOp::Add && old_vcpus > new_vcpus)
|| (op == ResourceUpdateOp::Del && old_vcpus < new_vcpus)
{
return Ok(old_vcpus);
}
// do not reduce computing power
// the number of vcpus would not be lower than the default size
let new_vcpus = cmp::max(new_vcpus, self.default_vcpu);
let (old, new) = hypervisor
.resize_vcpu(old_vcpus, new_vcpus)
.await
.context("resize vcpus")?;
if old < new {
let add = new - old;
info!(sl!(), "request to onlineCpuMem with {:?} cpus", add);
agent
.online_cpu_mem(OnlineCPUMemRequest {
wait: false,
nb_cpus: add,
cpu_only: true,
})
.await
.context("online vcpus")?;
}
Ok(new)
}
}

View File

@ -0,0 +1,7 @@
// Copyright (c) 2019-2023 Alibaba Cloud
// Copyright (c) 2019-2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
pub mod cpu;

View File

@ -22,6 +22,7 @@ pub mod rootfs;
pub mod share_fs;
pub mod volume;
pub use manager::ResourceManager;
pub mod cpu_mem;
use kata_types::config::hypervisor::SharedFsInfo;
@ -30,3 +31,10 @@ pub enum ResourceConfig {
Network(NetworkConfig),
ShareFs(SharedFsInfo),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResourceUpdateOp {
Add,
Del,
Update,
}

View File

@ -4,9 +4,8 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::network::NetworkConfig;
use crate::resource_persist::ResourceState;
use crate::{manager_inner::ResourceManagerInner, rootfs::Rootfs, volume::Volume, ResourceConfig};
use std::sync::Arc;
use agent::types::Device;
use agent::{Agent, Storage};
use anyhow::Result;
@ -17,9 +16,13 @@ use kata_types::config::TomlConfig;
use kata_types::mount::Mount;
use oci::{Linux, LinuxResources};
use persist::sandbox_persist::Persist;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::network::NetworkConfig;
use crate::resource_persist::ResourceState;
use crate::ResourceUpdateOp;
use crate::{manager_inner::ResourceManagerInner, rootfs::Rootfs, volume::Volume, ResourceConfig};
pub struct ManagerArgs {
pub sid: String,
pub agent: Arc<dyn Agent>,
@ -119,6 +122,16 @@ impl ResourceManager {
inner.update_cgroups(cid, linux_resources).await
}
pub async fn update_linux_resource(
&self,
cid: &str,
linux_resources: Option<&LinuxResources>,
op: ResourceUpdateOp,
) -> Result<()> {
let inner = self.inner.read().await;
inner.update_linux_resource(cid, linux_resources, op).await
}
pub async fn cleanup(&self) -> Result<()> {
let inner = self.inner.read().await;
inner.cleanup().await

View File

@ -4,13 +4,11 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::{sync::Arc, thread, vec};
use std::{sync::Arc, thread};
use crate::{network::NetworkConfig, resource_persist::ResourceState};
use agent::{types::Device, Agent, Storage};
use anyhow::{anyhow, Context, Ok, Result};
use async_trait::async_trait;
use hypervisor::{
device::{
device_manager::{do_handle_device, DeviceManager},
@ -20,18 +18,20 @@ use hypervisor::{
};
use kata_types::config::TomlConfig;
use kata_types::mount::Mount;
use oci::{Linux, LinuxResources};
use oci::{Linux, LinuxCpu, LinuxResources};
use persist::sandbox_persist::Persist;
use tokio::{runtime, sync::RwLock};
use crate::{
cgroups::{CgroupArgs, CgroupsResource},
cpu_mem::cpu::CpuResource,
manager::ManagerArgs,
network::{self, Network},
network::{self, Network, NetworkConfig},
resource_persist::ResourceState,
rootfs::{RootFsResource, Rootfs},
share_fs::{self, sandbox_bind_mounts::SandboxBindMounts, ShareFs},
volume::{Volume, VolumeResource},
ResourceConfig,
ResourceConfig, ResourceUpdateOp,
};
pub(crate) struct ResourceManagerInner {
@ -46,6 +46,7 @@ pub(crate) struct ResourceManagerInner {
pub rootfs_resource: RootFsResource,
pub volume_resource: VolumeResource,
pub cgroups_resource: CgroupsResource,
pub cpu_resource: CpuResource,
}
impl ResourceManagerInner {
@ -55,12 +56,12 @@ impl ResourceManagerInner {
hypervisor: Arc<dyn Hypervisor>,
toml_config: Arc<TomlConfig>,
) -> Result<Self> {
let cgroups_resource = CgroupsResource::new(sid, &toml_config)?;
// create device manager
let dev_manager =
DeviceManager::new(hypervisor.clone()).context("failed to create device manager")?;
let cgroups_resource = CgroupsResource::new(sid, &toml_config)?;
let cpu_resource = CpuResource::new(toml_config.clone())?;
Ok(Self {
sid: sid.to_string(),
toml_config,
@ -72,6 +73,7 @@ impl ResourceManagerInner {
rootfs_resource: RootFsResource::new(),
volume_resource: VolumeResource::new(),
cgroups_resource,
cpu_resource,
})
}
@ -354,6 +356,26 @@ impl ResourceManagerInner {
self.rootfs_resource.dump().await;
self.volume_resource.dump().await;
}
pub async fn update_linux_resource(
&self,
cid: &str,
linux_resources: Option<&LinuxResources>,
op: ResourceUpdateOp,
) -> Result<()> {
let linux_cpus = || -> Option<&LinuxCpu> { linux_resources.as_ref()?.cpu.as_ref() }();
self.cpu_resource
.update_cpu_resources(
cid,
linux_cpus,
op,
self.hypervisor.as_ref(),
self.agent.as_ref(),
)
.await?;
Ok(())
}
}
#[async_trait]
@ -400,6 +422,7 @@ impl Persist for ResourceManagerInner {
)
.await?,
toml_config: Arc::new(TomlConfig::default()),
cpu_resource: CpuResource::default(),
})
}
}

View File

@ -3,6 +3,7 @@
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use anyhow::Result;

View File

@ -26,12 +26,10 @@ pub trait Sandbox: Send + Sync {
async fn cleanup(&self) -> Result<()>;
async fn shutdown(&self) -> Result<()>;
// agent function
async fn agent_sock(&self) -> Result<String>;
// utils
async fn set_iptables(&self, is_ipv6: bool, data: Vec<u8>) -> Result<Vec<u8>>;
async fn get_iptables(&self, is_ipv6: bool) -> Result<Vec<u8>>;
async fn direct_volume_stats(&self, volume_path: &str) -> Result<String>;
async fn direct_volume_resize(&self, resize_req: agent::ResizeVolumeRequest) -> Result<()>;
async fn agent_sock(&self) -> Result<String>;
}

View File

@ -6,7 +6,6 @@
use std::{path::PathBuf, str::from_utf8, sync::Arc};
use crate::{shim_mgmt::server::MgmtServer, static_resource::StaticResourceManager};
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
@ -18,12 +17,11 @@ use kata_sys_util::spec::load_oci_spec;
use kata_types::{
annotations::Annotation, config::default::DEFAULT_GUEST_DNS_FILE, config::TomlConfig,
};
use netns_rs::NetNs;
use resource::network::generate_netns_name;
#[cfg(feature = "linux")]
use linux_container::LinuxContainer;
use netns_rs::NetNs;
use persist::sandbox_persist::Persist;
use resource::network::generate_netns_name;
use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER;
use tokio::fs;
use tokio::sync::{mpsc::Sender, RwLock};
@ -36,6 +34,9 @@ use virt_container::{
#[cfg(feature = "wasm")]
use wasm_container::WasmContainer;
use crate::shim_mgmt::server::MgmtServer;
use crate::static_resource::StaticResourceManager;
struct RuntimeHandlerManagerInner {
id: String,
msg_sender: Sender<Message>,

View File

@ -42,6 +42,7 @@ pub struct Container {
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
logger: slog::Logger,
pub linux_resources: Option<LinuxResources>,
}
impl Container {
@ -51,6 +52,7 @@ impl Container {
spec: oci::Spec,
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
linux_resources: Option<LinuxResources>,
) -> Result<Self> {
let container_id = ContainerID::new(&config.container_id).context("new container id")?;
let logger = sl!().new(o!("container_id" => config.container_id.clone()));
@ -78,6 +80,7 @@ impl Container {
agent,
resource_manager,
logger,
linux_resources,
})
}

View File

@ -6,6 +6,7 @@
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use std::{collections::HashMap, sync::Arc};
use agent::Agent;
@ -59,12 +60,17 @@ impl VirtContainerManager {
#[async_trait]
impl ContainerManager for VirtContainerManager {
async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID> {
let linux_resources = match spec.linux.clone() {
Some(linux) => linux.resources,
_ => None,
};
let container = Container::new(
self.pid,
config.clone(),
spec.clone(),
self.agent.clone(),
self.resource_manager.clone(),
linux_resources,
)
.context("new container")?;
@ -96,7 +102,6 @@ impl ContainerManager for VirtContainerManager {
let mut containers = self.containers.write().await;
container.create(spec).await.context("create")?;
containers.insert(container.container_id.to_string(), container);
Ok(PID { pid: self.pid })
}

View File

@ -361,7 +361,7 @@ impl Sandbox for VirtSandbox {
.await
.context("resource clean up")?;
// TODO: cleanup other snadbox resource
// TODO: cleanup other sandbox resource
Ok(())
}