diff --git a/docs/Limitations.md b/docs/Limitations.md index d122e5ca50..74a6acf2d0 100644 --- a/docs/Limitations.md +++ b/docs/Limitations.md @@ -147,7 +147,8 @@ these commands is potentially challenging. See issue https://github.com/clearcontainers/runtime/issues/341 and [the constraints challenge](#the-constraints-challenge) for more information. For CPUs resource management see -[CPU constraints](design/vcpu-handling.md). +[CPU constraints(in runtime-go)](design/vcpu-handling-runtime-go.md). +[CPU constraints(in runtime-rs)](design/vcpu-handling-runtime-rs.md). # Architectural limitations diff --git a/docs/design/README.md b/docs/design/README.md index 0c732defd1..2fe93b5f6e 100644 --- a/docs/design/README.md +++ b/docs/design/README.md @@ -6,7 +6,8 @@ Kata Containers design documents: - [API Design of Kata Containers](kata-api-design.md) - [Design requirements for Kata Containers](kata-design-requirements.md) - [VSocks](VSocks.md) -- [VCPU handling](vcpu-handling.md) +- [VCPU handling(in runtime-go)](vcpu-handling-runtime-go.md) +- [VCPU handling(in runtime-rs)](vcpu-handling-runtime-rs.md) - [VCPU threads pinning](vcpu-threads-pinning.md) - [Host cgroups](host-cgroups.md) - [Agent systemd cgroup](agent-systemd-cgroup.md) diff --git a/docs/design/VSocks.md b/docs/design/VSocks.md index 0271645c2b..9375f30f59 100644 --- a/docs/design/VSocks.md +++ b/docs/design/VSocks.md @@ -78,4 +78,4 @@ with the containers is if the VM itself or the `containerd-shim-kata-v2` dies, i the containers are removed automatically. [1]: https://wiki.qemu.org/Features/VirtioVsock -[2]: ./vcpu-handling.md#virtual-cpus-and-kubernetes-pods +[2]: ./vcpu-handling-runtime-go.md#virtual-cpus-and-kubernetes-pods diff --git a/docs/design/vcpu-handling.md b/docs/design/vcpu-handling-runtime-go.md similarity index 100% rename from docs/design/vcpu-handling.md rename to docs/design/vcpu-handling-runtime-go.md diff --git a/docs/design/vcpu-handling-runtime-rs.md b/docs/design/vcpu-handling-runtime-rs.md new file mode 100644 index 0000000000..44989ce4e6 --- /dev/null +++ b/docs/design/vcpu-handling-runtime-rs.md @@ -0,0 +1,51 @@ +# Virtual machine vCPU sizing in Kata Containers 3.0 + +> Preview: +> [Kubernetes(since 1.23)][1] and [Containerd(since 1.6.0-beta4)][2] will help calculate `Sandbox Size` info and pass it to Kata Containers through annotations. +> In order to adapt to this beneficial change and be compatible with the past, we have implemented the new vCPUs handling way in `runtime-rs`, which is slightly different from the original `runtime-go`'s design. + +## When do we need to handle vCPUs size? +vCPUs sizing should be determined by the container workloads. So throughout the life cycle of Kata Containers, there are several points in time when we need to think about how many vCPUs should be at the time. Mainly including the time points of `CreateVM`, `CreateContainer`, `UpdateContainer`, and `DeleteContainer`. +* `CreateVM`: When creating a sandbox, we need to know how many vCPUs to start the VM with. +* `CreateContainer`: When creating a new container in the VM, we may need to hot-plug the vCPUs according to the requirements in container's spec. +* `UpdateContainer`: When receiving the `UpdateContainer` request, we may need to update the vCPU resources according to the new requirements of the container. +* `DeleteContainer`: When a container is removed from the VM, we may need to hot-unplug the vCPUs to reclaim the vCPU resources introduced by the container. + +## On what basis do we calculate the number of vCPUs? +When Kata calculate the number of vCPUs, We have three data sources, the `default_vcpus` and `default_maxvcpus` specified in the configuration file (named `TomlConfig` later in the doc), the `io.kubernetes.cri.sandbox-cpu-quota` and `io.kubernetes.cri.sandbox-cpu-period` annotations passed by the upper layer runtime, and the corresponding CPU resource part in the container's spec for the container when `CreateContainer`/`UpdateContainer`/`DeleteContainer` is requested. + +Our understanding and priority of these resources are as follows, which will affect how we calculate the number of vCPUs later. + +* From `TomlConfig`: + * `default_vcpus`: default number of vCPUs when starting a VM. + * `default_maxvcpus`: maximum number of vCPUs. +* From `Annotation`: + * `InitialSize`: we call the size of the resource passed from the annotations as `InitialSize`. Kubernetes will calculate the sandbox size according to the Pod's statement, which is the `InitialSize` here. This size should be the size we want to prioritize. +* From `Container Spec`: + * The amount of CPU resources that the Container wants to use will be declared through the spec. Including the aforementioned annotations, we mainly consider `cpu quota` and `cpuset` when calculating the number of vCPUs. + * `cpu quota`: `cpu quota` is the most common way to declare the amount of CPU resources. The number of vCPUs introduced by `cpu quota` declared in a container's spec is: `vCPUs = ceiling( quota / period )`. + * `cpuset`: `cpuset` is often used to bind the CPUs that tasks can run on. The number of vCPUs may introduced by `cpuset` declared in a container's spec is the number of CPUs specified in the set that do not overlap with other containers. + + +## How to calculate and adjust the vCPUs size: +There are two types of vCPUs that we need to consider, one is the number of vCPUs when starting the VM (named `Boot Size` in the doc). The second is the number of vCPUs when `CreateContainer`/`UpdateContainer`/`DeleteContainer` request is received (`Real-time Size` in the doc). + +### `Boot Size` +The main considerations are `InitialSize` and `default_vcpus`. There are the following principles: +`InitialSize` has priority over `default_vcpus` declared in `TomlConfig`. +1. When there is such an annotation statement, the originally `default_vcpus` will be modified to the number of vCPUs in the `InitialSize` as the `Boot Size`. (Because not all runtimes support this annotation for the time being, we still keep the `default_cpus` in `TomlConfig`.) +2. When the specs of all containers are aggregated for sandbox size calculation, the method is consistent with the calculation method of `InitialSize` here. + +### `Real-time Size` +When we receive an OCI request, it may be for a single container. But what we have to consider is the number of vCPUs for the entire VM. So we will maintain a list. Every time there is a demand for adjustment, the entire list will be traversed to calculate a value for the number of vCPUs. In addition, there are the following principles: +1. Do not cut computing power and try to keep the number of vCPUs specified by `InitialSize`. + * So the number of vCPUs after will not be less than the `Boot Size`. +2. `cpu quota` takes precedence over `cpuset` and the setting history are took into account. + * We think quota describes the CPU time slice that a cgroup can use, and `cpuset` describes the actual CPU number that a cgroup can use. Quota can better describe the size of the CPU time slice that a cgroup actually wants to use. The `cpuset` only describes which CPUs the cgroup can use, but the cgroup can use the specified CPU but consumes a smaller time slice, so the quota takes precedence over the `cpuset`. + * On the one hand, when both `cpu quota` and `cpuset` are specified, we will calculate the number of vCPUs based on `cpu quota` and ignore `cpuset`. On the other hand, if `cpu quota` was used to control the number of vCPUs in the past, and only `cpuset` was updated during `UpdateContainer`, we will not adjust the number of vCPUs at this time. +3. `StaticSandboxResourceMgmt` controls hotplug. + * Some VMMs and kernels of some architectures do not support hotplugging. We can accommodate this situation through `StaticSandboxResourceMgmt`. When `StaticSandboxResourceMgmt = true` is set, we don't make any further attempts to update the number of vCPUs after booting. + + +[1]: https://github.com/kubernetes/kubernetes/pull/104886 +[2]: https://github.com/containerd/containerd/pull/6155 diff --git a/src/agent/src/linux_abi.rs b/src/agent/src/linux_abi.rs index 042acd0aed..de131faf02 100644 --- a/src/agent/src/linux_abi.rs +++ b/src/agent/src/linux_abi.rs @@ -81,7 +81,8 @@ cfg_if! { // sysfs as directories in the subtree under /sys/devices/LNXSYSTM:00 pub const ACPI_DEV_PATH: &str = "/devices/LNXSYSTM"; -pub const SYSFS_CPU_ONLINE_PATH: &str = "/sys/devices/system/cpu"; +pub const SYSFS_CPU_PATH: &str = "/sys/devices/system/cpu"; +pub const SYSFS_CPU_ONLINE_PATH: &str = "/sys/devices/system/cpu/online"; pub const SYSFS_MEMORY_BLOCK_SIZE_PATH: &str = "/sys/devices/system/memory/block_size_bytes"; pub const SYSFS_MEMORY_HOTPLUG_PROBE_PATH: &str = "/sys/devices/system/memory/probe"; diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 34275cc411..175304ed14 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -12,6 +12,7 @@ use crate::pci; use crate::uevent::{Uevent, UeventMatcher}; use crate::watcher::BindWatcher; use anyhow::{anyhow, Context, Result}; +use kata_types::cpu::CpuSet; use libc::pid_t; use oci::{Hook, Hooks}; use protocols::agent::OnlineCPUMemRequest; @@ -25,6 +26,7 @@ use std::collections::HashMap; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::str::FromStr; use std::sync::Arc; use std::{thread, time}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -263,12 +265,12 @@ impl Sandbox { pub fn online_cpu_memory(&self, req: &OnlineCPUMemRequest) -> Result<()> { if req.nb_cpus > 0 { // online cpus - online_cpus(&self.logger, req.nb_cpus as i32)?; + online_cpus(&self.logger, req.nb_cpus as i32).context("online cpus")?; } if !req.cpu_only { // online memory - online_memory(&self.logger)?; + online_memory(&self.logger).context("online memory")?; } if req.nb_cpus == 0 { @@ -432,23 +434,33 @@ fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Res // max wait for all CPUs to online will use 50 * 100 = 5 seconds. const ONLINE_CPUMEM_WATI_MILLIS: u64 = 50; -const ONLINE_CPUMEM_MAX_RETRIES: u32 = 100; +const ONLINE_CPUMEM_MAX_RETRIES: i32 = 100; #[instrument] fn online_cpus(logger: &Logger, num: i32) -> Result { - let mut onlined_count: i32 = 0; + let mut onlined_cpu_count = onlined_cpus().context("onlined cpu count")?; + // for some vmms, like dragonball, they will online cpus for us + // so check first whether agent need to do the online operation + if onlined_cpu_count >= num { + return Ok(num); + } for i in 0..ONLINE_CPUMEM_MAX_RETRIES { - let r = online_resources( + // online num resources + online_resources( logger, - SYSFS_CPU_ONLINE_PATH, + SYSFS_CPU_PATH, r"cpu[0-9]+", - num - onlined_count, - ); + num - onlined_cpu_count, + ) + .context("online cpu resource")?; - onlined_count += r?; - if onlined_count == num { - info!(logger, "online {} CPU(s) after {} retries", num, i); + onlined_cpu_count = onlined_cpus().context("onlined cpu count")?; + if onlined_cpu_count >= num { + info!( + logger, + "Currently {} onlined CPU(s) after {} retries", onlined_cpu_count, i + ); return Ok(num); } thread::sleep(time::Duration::from_millis(ONLINE_CPUMEM_WATI_MILLIS)); @@ -463,10 +475,18 @@ fn online_cpus(logger: &Logger, num: i32) -> Result { #[instrument] fn online_memory(logger: &Logger) -> Result<()> { - online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1)?; + online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1) + .context("online memory resource")?; Ok(()) } +fn onlined_cpus() -> Result { + let content = + fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?; + let online_cpu_set = CpuSet::from_str(content.trim())?; + Ok(online_cpu_set.len() as i32) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/libs/kata-types/src/cpu.rs b/src/libs/kata-types/src/cpu.rs index 0020de097b..e47681f623 100644 --- a/src/libs/kata-types/src/cpu.rs +++ b/src/libs/kata-types/src/cpu.rs @@ -26,7 +26,7 @@ pub enum Error { } /// Assigned CPU resources for a Linux container. -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct LinuxContainerCpuResources { shares: u64, period: u64, diff --git a/src/libs/kata-types/src/utils/u32_set.rs b/src/libs/kata-types/src/utils/u32_set.rs index 3742e4d54f..44c55a1639 100644 --- a/src/libs/kata-types/src/utils/u32_set.rs +++ b/src/libs/kata-types/src/utils/u32_set.rs @@ -13,7 +13,7 @@ use crate::Error; /// /// The `U32Set` may be used to save CPUs parsed from a CPU list file or NUMA nodes parsed from /// a NUMA node list file. -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct U32Set(Vec); impl U32Set { diff --git a/src/libs/protocols/protos/agent.proto b/src/libs/protocols/protos/agent.proto index 3ad755256c..9ed7c0a049 100644 --- a/src/libs/protocols/protos/agent.proto +++ b/src/libs/protocols/protos/agent.proto @@ -366,7 +366,8 @@ message OnlineCPUMemRequest { // resources are connected asynchronously and the agent returns immediately. bool wait = 1; - // NbCpus specifies the number of CPUs that were added and the agent has to online. + // NbCpus specifies the number of CPUs that should be onlined in the guest. + // Special value 0 means agent will skip this check. uint32 nb_cpus = 2; // CpuOnly specifies whether only online CPU or not. diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index 0b5cd7ca9f..358776fab0 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -655,11 +655,12 @@ dependencies = [ [[package]] name = "dbs-address-space" -version = "0.2.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bcc37dc0b8ffae1c5911d13ae630dc7a9020fa0de0edd178d6ab71daf56c8fc" +checksum = "95e20d28a9cd13bf00d0ecd1bd073d242242b04f0acb663d7adfc659f8879322" dependencies = [ "arc-swap", + "lazy_static", "libc", "nix 0.23.2", "thiserror", @@ -746,9 +747,9 @@ dependencies = [ [[package]] name = "dbs-upcall" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "699e62afa444ae4b00d474fd91bc37785ba050acdfbe179731c81898e32efc3f" +checksum = "ea3a78128fd0be8b8b10257675c262b378dc5d00b1e18157736a6c27e45ce4fb" dependencies = [ "anyhow", "dbs-utils", @@ -776,9 +777,9 @@ dependencies = [ [[package]] name = "dbs-virtio-devices" -version = "0.2.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88e5c6c48b766afb95851b04b6b193871a59d0b2a3ed19990d4f8f651ae5c668" +checksum = "24d671cc3e5f98b84ef6b6bed007d28f72f16d3aea8eb38e2d42b00b2973c1d8" dependencies = [ "byteorder", "caps", @@ -792,7 +793,7 @@ dependencies = [ "kvm-ioctls", "libc", "log", - "nix 0.23.2", + "nix 0.24.3", "nydus-api", "nydus-blobfs", "nydus-rafs", diff --git a/src/runtime-rs/Cargo.toml b/src/runtime-rs/Cargo.toml index 1859121459..bbc401f645 100644 --- a/src/runtime-rs/Cargo.toml +++ b/src/runtime-rs/Cargo.toml @@ -3,3 +3,4 @@ members = [ "crates/shim", "crates/shim-ctl", ] + diff --git a/src/runtime-rs/crates/agent/src/kata/agent.rs b/src/runtime-rs/crates/agent/src/kata/agent.rs index d06da15ea6..13ba4085da 100644 --- a/src/runtime-rs/crates/agent/src/kata/agent.rs +++ b/src/runtime-rs/crates/agent/src/kata/agent.rs @@ -117,5 +117,6 @@ impl_agent!( get_ip_tables | crate::GetIPTablesRequest | crate::GetIPTablesResponse | None, set_ip_tables | crate::SetIPTablesRequest | crate::SetIPTablesResponse | None, get_volume_stats | crate::VolumeStatsRequest | crate::VolumeStatsResponse | None, - resize_volume | crate::ResizeVolumeRequest | crate::Empty | None + resize_volume | crate::ResizeVolumeRequest | crate::Empty | None, + online_cpu_mem | crate::OnlineCPUMemRequest | crate::Empty | None ); diff --git a/src/runtime-rs/crates/agent/src/kata/trans.rs b/src/runtime-rs/crates/agent/src/kata/trans.rs index 172095ceb3..ca9c8f1720 100644 --- a/src/runtime-rs/crates/agent/src/kata/trans.rs +++ b/src/runtime-rs/crates/agent/src/kata/trans.rs @@ -336,7 +336,7 @@ impl From for agent::UpdateContainerRequest { fn from(from: UpdateContainerRequest) -> Self { Self { container_id: from.container_id, - resources: from_option(Some(from.resources)), + resources: from_option(from.resources), ..Default::default() } } diff --git a/src/runtime-rs/crates/agent/src/lib.rs b/src/runtime-rs/crates/agent/src/lib.rs index ea3bab78f8..43449ca591 100644 --- a/src/runtime-rs/crates/agent/src/lib.rs +++ b/src/runtime-rs/crates/agent/src/lib.rs @@ -54,6 +54,7 @@ pub trait Agent: AgentManager + HealthService + Send + Sync { // sandbox async fn create_sandbox(&self, req: CreateSandboxRequest) -> Result; async fn destroy_sandbox(&self, req: Empty) -> Result; + async fn online_cpu_mem(&self, req: OnlineCPUMemRequest) -> Result; // network async fn add_arp_neighbors(&self, req: AddArpNeighborRequest) -> Result; diff --git a/src/runtime-rs/crates/agent/src/types.rs b/src/runtime-rs/crates/agent/src/types.rs index da6e14430b..1ba7efd5e3 100644 --- a/src/runtime-rs/crates/agent/src/types.rs +++ b/src/runtime-rs/crates/agent/src/types.rs @@ -201,7 +201,7 @@ pub struct ListProcessesRequest { #[derive(PartialEq, Clone, Default)] pub struct UpdateContainerRequest { pub container_id: String, - pub resources: oci::LinuxResources, + pub resources: Option, pub mounts: Vec, } diff --git a/src/runtime-rs/crates/hypervisor/Cargo.toml b/src/runtime-rs/crates/hypervisor/Cargo.toml index eb613aad3c..6a61faea45 100644 --- a/src/runtime-rs/crates/hypervisor/Cargo.toml +++ b/src/runtime-rs/crates/hypervisor/Cargo.toml @@ -32,7 +32,7 @@ kata-types = { path = "../../../libs/kata-types" } logging = { path = "../../../libs/logging" } shim-interface = { path = "../../../libs/shim-interface" } -dragonball = { path = "../../../dragonball", features = ["atomic-guest-memory", "virtio-vsock", "hotplug", "virtio-blk", "virtio-net", "virtio-fs","dbs-upcall"] } +dragonball = { path = "../../../dragonball", features = ["atomic-guest-memory", "virtio-vsock", "hotplug", "virtio-blk", "virtio-net", "virtio-fs", "dbs-upcall"] } ch-config = { path = "ch-config", optional = true } diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs index 9cf5bcd8c8..a54a6a43e2 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs @@ -494,6 +494,10 @@ impl CloudHypervisorInner { Ok(()) } + pub(crate) async fn resize_vcpu(&self, old_vcpu: u32, new_vcpu: u32) -> Result<(u32, u32)> { + Ok((old_vcpu, new_vcpu)) + } + pub(crate) async fn get_pids(&self) -> Result> { Ok(Vec::::new()) } diff --git a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs index a4b8b05fff..dd95413fc3 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs @@ -114,6 +114,11 @@ impl Hypervisor for CloudHypervisor { inner.cleanup().await } + async fn resize_vcpu(&self, old_vcpu: u32, new_vcpu: u32) -> Result<(u32, u32)> { + let inner = self.inner.read().await; + inner.resize_vcpu(old_vcpu, new_vcpu).await + } + async fn get_pids(&self) -> Result> { let inner = self.inner.read().await; inner.get_pids().await diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs index 45b77f09eb..09afc4c1a0 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs @@ -13,7 +13,7 @@ use crate::{ use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use dragonball::{ - api::v1::{BlockDeviceConfigInfo, BootSourceConfig}, + api::v1::{BlockDeviceConfigInfo, BootSourceConfig, VcpuResizeInfo}, vm::VmConfigInfo, }; use kata_sys_util::mount; @@ -327,6 +327,53 @@ impl DragonballInner { } } + // check if resizing info is valid + // the error in this function is not ok to be tolerated, the container boot will fail + fn precheck_resize_vcpus(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> { + // old_vcpus > 0, safe for conversion + let current_vcpus = old_vcpus; + + // a non-zero positive is required + if new_vcpus == 0 { + return Err(anyhow!("resize vcpu error: 0 vcpu resizing is invalid")); + } + + // cannot exceed maximum value + if new_vcpus > self.config.cpu_info.default_maxvcpus { + return Err(anyhow!("resize vcpu error: cannot greater than maxvcpus")); + } + + Ok((current_vcpus, new_vcpus)) + } + + // do the check before resizing, returns Result<(old, new)> + pub(crate) async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> { + if old_vcpus == new_vcpus { + info!( + sl!(), + "resize_vcpu: no need to resize vcpus because old_vcpus is equal to new_vcpus" + ); + return Ok((new_vcpus, new_vcpus)); + } + + let (old_vcpus, new_vcpus) = self.precheck_resize_vcpus(old_vcpus, new_vcpus)?; + info!( + sl!(), + "check_resize_vcpus passed, passing new_vcpus = {:?} to vmm", new_vcpus + ); + + let cpu_resize_info = VcpuResizeInfo { + vcpu_count: Some(new_vcpus as u8), + }; + self.vmm_instance + .resize_vcpu(&cpu_resize_info) + .context(format!( + "failed to do_resize_vcpus on new_vcpus={:?}", + new_vcpus + ))?; + Ok((old_vcpus, new_vcpus)) + } + pub fn set_hypervisor_config(&mut self, config: HypervisorConfig) { self.config = config; } diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs index c6df95cc95..7072949376 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs @@ -77,6 +77,12 @@ impl Hypervisor for Dragonball { inner.save_vm().await } + // returns Result<(old_vcpus, new_vcpus)> + async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> { + let inner = self.inner.read().await; + inner.resize_vcpu(old_vcpus, new_vcpus).await + } + async fn add_device(&self, device: DeviceType) -> Result<()> { let mut inner = self.inner.write().await; inner.add_device(device).await diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs index 4c40b40c58..ad3977eca7 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/vmm_instance.rs @@ -16,8 +16,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use dragonball::{ api::v1::{ BlockDeviceConfigInfo, BootSourceConfig, FsDeviceConfigInfo, FsMountConfigInfo, - InstanceInfo, InstanceState, VirtioNetDeviceConfigInfo, VmmAction, VmmActionError, VmmData, - VmmRequest, VmmResponse, VmmService, VsockDeviceConfigInfo, + InstanceInfo, InstanceState, VcpuResizeInfo, VirtioNetDeviceConfigInfo, VmmAction, + VmmActionError, VmmData, VmmRequest, VmmResponse, VmmService, VsockDeviceConfigInfo, }, vm::VmConfigInfo, Vmm, @@ -248,6 +248,12 @@ impl VmmInstance { Ok(()) } + pub fn resize_vcpu(&self, cfg: &VcpuResizeInfo) -> Result<()> { + self.handle_request(Request::Sync(VmmAction::ResizeVcpu(cfg.clone()))) + .with_context(|| format!("Failed to resize_vm(hotplug vcpu), cfg: {:?}", cfg))?; + Ok(()) + } + pub fn pause(&self) -> Result<()> { todo!() } diff --git a/src/runtime-rs/crates/hypervisor/src/lib.rs b/src/runtime-rs/crates/hypervisor/src/lib.rs index 2001433e5f..2386d09242 100644 --- a/src/runtime-rs/crates/hypervisor/src/lib.rs +++ b/src/runtime-rs/crates/hypervisor/src/lib.rs @@ -78,6 +78,7 @@ pub trait Hypervisor: Send + Sync { async fn pause_vm(&self) -> Result<()>; async fn save_vm(&self) -> Result<()>; async fn resume_vm(&self) -> Result<()>; + async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)>; // returns (old_vcpus, new_vcpus) // device manager async fn add_device(&self, device: DeviceType) -> Result<()>; diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs index 456bf63739..4b1f7cae38 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/inner.rs @@ -104,6 +104,11 @@ impl QemuInner { todo!() } + pub(crate) async fn resize_vcpu(&self, _old_vcpus: u32, _new_vcpus: u32) -> Result<(u32, u32)> { + info!(sl!(), "QemuInner::resize_vcpu()"); + todo!() + } + pub(crate) async fn get_pids(&self) -> Result> { info!(sl!(), "QemuInner::get_pids()"); todo!() diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs index 77217f1537..eb657dc2db 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs @@ -118,6 +118,11 @@ impl Hypervisor for Qemu { inner.cleanup().await } + async fn resize_vcpu(&self, old_vcpus: u32, new_vcpus: u32) -> Result<(u32, u32)> { + let inner = self.inner.read().await; + inner.resize_vcpu(old_vcpus, new_vcpus).await + } + async fn get_pids(&self) -> Result> { let inner = self.inner.read().await; inner.get_pids().await diff --git a/src/runtime-rs/crates/resource/Cargo.toml b/src/runtime-rs/crates/resource/Cargo.toml index 9847ce61b1..22ffda48b9 100644 --- a/src/runtime-rs/crates/resource/Cargo.toml +++ b/src/runtime-rs/crates/resource/Cargo.toml @@ -41,4 +41,5 @@ logging = { path = "../../../libs/logging" } oci = { path = "../../../libs/oci" } actix-rt = "2.7.0" persist = { path = "../persist"} + [features] diff --git a/src/runtime-rs/crates/resource/src/cgroups/mod.rs b/src/runtime-rs/crates/resource/src/cgroups/mod.rs index b7f515d7f0..831e30c0ff 100644 --- a/src/runtime-rs/crates/resource/src/cgroups/mod.rs +++ b/src/runtime-rs/crates/resource/src/cgroups/mod.rs @@ -26,6 +26,8 @@ use oci::LinuxResources; use persist::sandbox_persist::Persist; use tokio::sync::RwLock; +use crate::ResourceUpdateOp; + const OS_ERROR_NO_SUCH_PROCESS: i32 = 3; pub struct CgroupArgs { @@ -149,29 +151,51 @@ impl CgroupsResource { &self, cid: &str, linux_resources: Option<&LinuxResources>, + op: ResourceUpdateOp, h: &dyn Hypervisor, ) -> Result<()> { - let resource = self.calc_resource(linux_resources); - let changed = self.update_resources(cid, resource).await; + let new_resources = self.calc_resource(linux_resources); + let old_resources = self.update_resources(cid, new_resources.clone(), op).await; - if !changed { - return Ok(()); - } - - self.do_update_cgroups(h).await - } - - async fn update_resources(&self, cid: &str, new_resource: Resources) -> bool { - let mut resources = self.resources.write().await; - let old_resource = resources.insert(cid.to_owned(), new_resource.clone()); - - if let Some(old_resource) = old_resource { - if old_resource == new_resource { - return false; + if let Some(old_resource) = old_resources.clone() { + if old_resource == new_resources { + return Ok(()); } } - true + match self.do_update_cgroups(h).await { + Err(e) => { + // if update failed, we should roll back the records in resources + let mut resources = self.resources.write().await; + match op { + ResourceUpdateOp::Add => { + resources.remove(cid); + } + ResourceUpdateOp::Update | ResourceUpdateOp::Del => { + if let Some(old_resource) = old_resources { + resources.insert(cid.to_owned(), old_resource); + } + } + } + Err(e) + } + Ok(()) => Ok(()), + } + } + + async fn update_resources( + &self, + cid: &str, + new_resource: Resources, + op: ResourceUpdateOp, + ) -> Option { + let mut resources = self.resources.write().await; + match op { + ResourceUpdateOp::Add | ResourceUpdateOp::Update => { + resources.insert(cid.to_owned(), new_resource.clone()) + } + ResourceUpdateOp::Del => resources.remove(cid), + } } async fn do_update_cgroups(&self, h: &dyn Hypervisor) -> Result<()> { diff --git a/src/runtime-rs/crates/resource/src/cpu_mem/cpu.rs b/src/runtime-rs/crates/resource/src/cpu_mem/cpu.rs new file mode 100644 index 0000000000..661e1a5e3a --- /dev/null +++ b/src/runtime-rs/crates/resource/src/cpu_mem/cpu.rs @@ -0,0 +1,188 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + cmp, + collections::{HashMap, HashSet}, + convert::TryFrom, + sync::Arc, +}; + +use agent::{Agent, OnlineCPUMemRequest}; +use anyhow::{Context, Ok, Result}; +use hypervisor::Hypervisor; +use kata_types::{config::TomlConfig, cpu::LinuxContainerCpuResources}; +use oci::LinuxCpu; +use tokio::sync::RwLock; + +use crate::ResourceUpdateOp; + +#[derive(Default, Debug, Clone)] +pub struct CpuResource { + /// Current number of vCPUs + pub(crate) current_vcpu: Arc>, + + /// Default number of vCPUs + pub(crate) default_vcpu: u32, + + /// CpuResource of each container + pub(crate) container_cpu_resources: Arc>>, +} + +impl CpuResource { + pub fn new(config: Arc) -> Result { + let hypervisor_name = config.runtime.hypervisor_name.clone(); + let hypervisor_config = config + .hypervisor + .get(&hypervisor_name) + .context(format!("failed to get hypervisor {}", hypervisor_name))?; + Ok(Self { + current_vcpu: Arc::new(RwLock::new(hypervisor_config.cpu_info.default_vcpus as u32)), + default_vcpu: hypervisor_config.cpu_info.default_vcpus as u32, + container_cpu_resources: Arc::new(RwLock::new(HashMap::new())), + }) + } + + pub(crate) async fn update_cpu_resources( + &self, + cid: &str, + linux_cpus: Option<&LinuxCpu>, + op: ResourceUpdateOp, + hypervisor: &dyn Hypervisor, + agent: &dyn Agent, + ) -> Result<()> { + self.update_container_cpu_resources(cid, linux_cpus, op) + .await + .context("update container cpu resources")?; + let vcpu_required = self + .calc_cpu_resources() + .await + .context("calculate vcpus required")?; + + if vcpu_required == self.current_vcpu().await { + return Ok(()); + } + + let curr_vcpus = self + .do_update_cpu_resources(vcpu_required, op, hypervisor, agent) + .await?; + self.update_current_vcpu(curr_vcpus).await; + Ok(()) + } + + async fn current_vcpu(&self) -> u32 { + let current_vcpu = self.current_vcpu.read().await; + *current_vcpu + } + + async fn update_current_vcpu(&self, new_vcpus: u32) { + let mut current_vcpu = self.current_vcpu.write().await; + *current_vcpu = new_vcpus; + } + + // update container_cpu_resources field + async fn update_container_cpu_resources( + &self, + cid: &str, + linux_cpus: Option<&LinuxCpu>, + op: ResourceUpdateOp, + ) -> Result<()> { + if let Some(cpu) = linux_cpus { + let container_resource = LinuxContainerCpuResources::try_from(cpu)?; + let mut resources = self.container_cpu_resources.write().await; + match op { + ResourceUpdateOp::Add => { + resources.insert(cid.to_owned(), container_resource); + } + ResourceUpdateOp::Update => { + let resource = resources.insert(cid.to_owned(), container_resource.clone()); + if let Some(old_container_resource) = resource { + // the priority of cpu-quota is higher than cpuset when determine the number of vcpus. + // we should better ignore the resource update when update cpu only by cpuset if cpu-quota + // has been set previously. + if old_container_resource.quota() > 0 && container_resource.quota() < 0 { + resources.insert(cid.to_owned(), old_container_resource); + } + } + } + ResourceUpdateOp::Del => { + resources.remove(cid); + } + } + } + + Ok(()) + } + + // calculates the total required vcpus by adding each container's requirements within the pod + async fn calc_cpu_resources(&self) -> Result { + let mut total_vcpu = 0; + let mut cpuset_vcpu: HashSet = HashSet::new(); + + let resources = self.container_cpu_resources.read().await; + for (_, cpu_resource) in resources.iter() { + let vcpu = cpu_resource.get_vcpus().unwrap_or(0) as u32; + cpuset_vcpu.extend(cpu_resource.cpuset().iter()); + total_vcpu += vcpu; + } + + // contrained only by cpuset + if total_vcpu == 0 && !cpuset_vcpu.is_empty() { + info!(sl!(), "(from cpuset)get vcpus # {:?}", cpuset_vcpu); + return Ok(cpuset_vcpu.len() as u32); + } + + info!( + sl!(), + "(from cfs_quota&cfs_period)get vcpus count {}", total_vcpu + ); + Ok(total_vcpu) + } + + // do hotplug and hot-unplug the vcpu + async fn do_update_cpu_resources( + &self, + new_vcpus: u32, + op: ResourceUpdateOp, + hypervisor: &dyn Hypervisor, + agent: &dyn Agent, + ) -> Result { + let old_vcpus = self.current_vcpu().await; + + // when adding vcpus, ignore old_vcpus > new_vcpus + // when deleting vcpus, ignore old_vcpus < new_vcpus + if (op == ResourceUpdateOp::Add && old_vcpus > new_vcpus) + || (op == ResourceUpdateOp::Del && old_vcpus < new_vcpus) + { + return Ok(old_vcpus); + } + + // do not reduce computing power + // the number of vcpus would not be lower than the default size + let new_vcpus = cmp::max(new_vcpus, self.default_vcpu); + + let (old, new) = hypervisor + .resize_vcpu(old_vcpus, new_vcpus) + .await + .context("resize vcpus")?; + + if old < new { + let add = new - old; + info!(sl!(), "request to onlineCpuMem with {:?} cpus", add); + + agent + .online_cpu_mem(OnlineCPUMemRequest { + wait: false, + nb_cpus: new, + cpu_only: true, + }) + .await + .context("online vcpus")?; + } + + Ok(new) + } +} diff --git a/src/runtime-rs/crates/runtimes/src/static_resource.rs b/src/runtime-rs/crates/resource/src/cpu_mem/initial_size.rs similarity index 82% rename from src/runtime-rs/crates/runtimes/src/static_resource.rs rename to src/runtime-rs/crates/resource/src/cpu_mem/initial_size.rs index 453ce85b3f..53eccc52bd 100644 --- a/src/runtime-rs/crates/runtimes/src/static_resource.rs +++ b/src/runtime-rs/crates/resource/src/cpu_mem/initial_size.rs @@ -13,17 +13,16 @@ use kata_types::{ cpu::LinuxContainerCpuResources, k8s::container_type, }; -// static resource that StaticResourceManager needs, this is the spec for the +// initial resource that InitialSizeManager needs, this is the spec for the // sandbox/container's workload #[derive(Clone, Copy, Debug)] -struct StaticResource { +struct InitialSize { vcpu: u32, mem_mb: u32, } -// generate static resource(vcpu and memory in MiB) from spec's information -// used for static resource management -impl TryFrom<&oci::Spec> for StaticResource { +// generate initial resource(vcpu and memory in MiB) from spec's information +impl TryFrom<&oci::Spec> for InitialSize { type Error = anyhow::Error; fn try_from(spec: &oci::Spec) -> Result { let mut vcpu: u32 = 0; @@ -65,31 +64,32 @@ impl TryFrom<&oci::Spec> for StaticResource { } info!( sl!(), - "static resource mgmt result: vcpu={}, mem_mb={}", vcpu, mem_mb + "(from PodSandbox's annotation / SingleContainer's spec) initial size: vcpu={}, mem_mb={}", vcpu, mem_mb ); Ok(Self { vcpu, mem_mb }) } } -// StaticResourceManager is responsible for static resource management +// InitialSizeManager is responsible for initial vcpu/mem management // -// static resource management sizing information is optionally provided, either by +// inital vcpu/mem management sizing information is optionally provided, either by // upper layer runtime (containerd / crio) or by the container spec itself (when it // is a standalone single container such as the one started with *docker run*) // // the sizing information uses three values, cpu quota, cpu period and memory limit, -// and with above values it calculates the # vcpus and memory for the workload and -// add them to default value of the config +// and with above values it calculates the # vcpus and memory for the workload +// +// if the workload # of vcpus and memory is invalid for vmms, we still use default +// value in toml_config #[derive(Clone, Copy, Debug)] -pub struct StaticResourceManager { - resource: StaticResource, +pub struct InitialSizeManager { + resource: InitialSize, } -impl StaticResourceManager { +impl InitialSizeManager { pub fn new(spec: &oci::Spec) -> Result { Ok(Self { - resource: StaticResource::try_from(spec) - .context("failed to construct static resource")?, + resource: InitialSize::try_from(spec).context("failed to construct static resource")?, }) } @@ -100,8 +100,13 @@ impl StaticResourceManager { .hypervisor .get_mut(hypervisor_name) .context("failed to get hypervisor config")?; - hv.cpu_info.default_vcpus += self.resource.vcpu as i32; - hv.memory_info.default_memory += self.resource.mem_mb; + + if self.resource.vcpu > 0 { + hv.cpu_info.default_vcpus = self.resource.vcpu as i32 + } + if self.resource.mem_mb > 0 { + hv.memory_info.default_memory = self.resource.mem_mb; + } Ok(()) } } @@ -151,7 +156,7 @@ mod tests { struct TestData<'a> { desc: &'a str, input: InputData, - result: StaticResource, + result: InitialSize, } fn get_test_data() -> Vec> { @@ -163,7 +168,7 @@ mod tests { quota: None, memory: None, }, - result: StaticResource { vcpu: 0, mem_mb: 0 }, + result: InitialSize { vcpu: 0, mem_mb: 0 }, }, TestData { desc: "normal resource limit", @@ -173,7 +178,7 @@ mod tests { quota: Some(220_000), memory: Some(1024 * 1024 * 512), }, - result: StaticResource { + result: InitialSize { vcpu: 3, mem_mb: 512, }, @@ -183,7 +188,7 @@ mod tests { } #[test] - fn test_static_resource_mgmt_sandbox() { + fn test_initial_size_sandbox() { let tests = get_test_data(); // run tests @@ -210,22 +215,22 @@ mod tests { ..Default::default() }; - let static_resource = StaticResource::try_from(&spec); + let initial_size = InitialSize::try_from(&spec); assert!( - static_resource.is_ok(), + initial_size.is_ok(), "test[{}]: {:?} should be ok", i, d.desc ); - let static_resource = static_resource.unwrap(); + let initial_size = initial_size.unwrap(); assert_eq!( - static_resource.vcpu, d.result.vcpu, + initial_size.vcpu, d.result.vcpu, "test[{}]: {:?} vcpu should be {}", i, d.desc, d.result.vcpu, ); assert_eq!( - static_resource.mem_mb, d.result.mem_mb, + initial_size.mem_mb, d.result.mem_mb, "test[{}]: {:?} memory should be {}", i, d.desc, d.result.mem_mb, ); @@ -233,7 +238,7 @@ mod tests { } #[test] - fn test_static_resource_mgmt_container() { + fn test_initial_size_container() { let tests = get_test_data(); // run tests @@ -261,22 +266,22 @@ mod tests { ..Default::default() }; - let static_resource = StaticResource::try_from(&spec); + let initial_size = InitialSize::try_from(&spec); assert!( - static_resource.is_ok(), + initial_size.is_ok(), "test[{}]: {:?} should be ok", i, d.desc ); - let static_resource = static_resource.unwrap(); + let initial_size = initial_size.unwrap(); assert_eq!( - static_resource.vcpu, d.result.vcpu, + initial_size.vcpu, d.result.vcpu, "test[{}]: {:?} vcpu should be {}", i, d.desc, d.result.vcpu, ); assert_eq!( - static_resource.mem_mb, d.result.mem_mb, + initial_size.mem_mb, d.result.mem_mb, "test[{}]: {:?} memory should be {}", i, d.desc, d.result.mem_mb, ); diff --git a/src/runtime-rs/crates/resource/src/cpu_mem/mod.rs b/src/runtime-rs/crates/resource/src/cpu_mem/mod.rs new file mode 100644 index 0000000000..f2984cd1cb --- /dev/null +++ b/src/runtime-rs/crates/resource/src/cpu_mem/mod.rs @@ -0,0 +1,8 @@ +// Copyright (c) 2019-2023 Alibaba Cloud +// Copyright (c) 2019-2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +pub mod cpu; +pub mod initial_size; diff --git a/src/runtime-rs/crates/resource/src/lib.rs b/src/runtime-rs/crates/resource/src/lib.rs index b118e2e380..4e4aae9e87 100644 --- a/src/runtime-rs/crates/resource/src/lib.rs +++ b/src/runtime-rs/crates/resource/src/lib.rs @@ -22,6 +22,7 @@ pub mod rootfs; pub mod share_fs; pub mod volume; pub use manager::ResourceManager; +pub mod cpu_mem; use kata_types::config::hypervisor::SharedFsInfo; @@ -30,3 +31,10 @@ pub enum ResourceConfig { Network(NetworkConfig), ShareFs(SharedFsInfo), } + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ResourceUpdateOp { + Add, + Del, + Update, +} diff --git a/src/runtime-rs/crates/resource/src/manager.rs b/src/runtime-rs/crates/resource/src/manager.rs index a022f722f8..d79de40cdb 100644 --- a/src/runtime-rs/crates/resource/src/manager.rs +++ b/src/runtime-rs/crates/resource/src/manager.rs @@ -4,9 +4,8 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::network::NetworkConfig; -use crate::resource_persist::ResourceState; -use crate::{manager_inner::ResourceManagerInner, rootfs::Rootfs, volume::Volume, ResourceConfig}; +use std::sync::Arc; + use agent::types::Device; use agent::{Agent, Storage}; use anyhow::Result; @@ -17,9 +16,13 @@ use kata_types::config::TomlConfig; use kata_types::mount::Mount; use oci::{Linux, LinuxResources}; use persist::sandbox_persist::Persist; -use std::sync::Arc; use tokio::sync::RwLock; +use crate::network::NetworkConfig; +use crate::resource_persist::ResourceState; +use crate::ResourceUpdateOp; +use crate::{manager_inner::ResourceManagerInner, rootfs::Rootfs, volume::Volume, ResourceConfig}; + pub struct ManagerArgs { pub sid: String, pub agent: Arc, @@ -110,13 +113,14 @@ impl ResourceManager { inner.dump().await } - pub async fn update_cgroups( + pub async fn update_linux_resource( &self, cid: &str, linux_resources: Option<&LinuxResources>, - ) -> Result<()> { + op: ResourceUpdateOp, + ) -> Result> { let inner = self.inner.read().await; - inner.update_cgroups(cid, linux_resources).await + inner.update_linux_resource(cid, linux_resources, op).await } pub async fn cleanup(&self) -> Result<()> { diff --git a/src/runtime-rs/crates/resource/src/manager_inner.rs b/src/runtime-rs/crates/resource/src/manager_inner.rs index 0c77ec1423..b60925c111 100644 --- a/src/runtime-rs/crates/resource/src/manager_inner.rs +++ b/src/runtime-rs/crates/resource/src/manager_inner.rs @@ -4,13 +4,11 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::{sync::Arc, thread, vec}; +use std::{sync::Arc, thread}; -use crate::{network::NetworkConfig, resource_persist::ResourceState}; use agent::{types::Device, Agent, Storage}; use anyhow::{anyhow, Context, Ok, Result}; use async_trait::async_trait; - use hypervisor::{ device::{ device_manager::{do_handle_device, DeviceManager}, @@ -20,18 +18,20 @@ use hypervisor::{ }; use kata_types::config::TomlConfig; use kata_types::mount::Mount; -use oci::{Linux, LinuxResources}; +use oci::{Linux, LinuxCpu, LinuxResources}; use persist::sandbox_persist::Persist; use tokio::{runtime, sync::RwLock}; use crate::{ cgroups::{CgroupArgs, CgroupsResource}, + cpu_mem::cpu::CpuResource, manager::ManagerArgs, - network::{self, Network}, + network::{self, Network, NetworkConfig}, + resource_persist::ResourceState, rootfs::{RootFsResource, Rootfs}, share_fs::{self, sandbox_bind_mounts::SandboxBindMounts, ShareFs}, volume::{Volume, VolumeResource}, - ResourceConfig, + ResourceConfig, ResourceUpdateOp, }; pub(crate) struct ResourceManagerInner { @@ -46,6 +46,7 @@ pub(crate) struct ResourceManagerInner { pub rootfs_resource: RootFsResource, pub volume_resource: VolumeResource, pub cgroups_resource: CgroupsResource, + pub cpu_resource: CpuResource, } impl ResourceManagerInner { @@ -55,12 +56,12 @@ impl ResourceManagerInner { hypervisor: Arc, toml_config: Arc, ) -> Result { - let cgroups_resource = CgroupsResource::new(sid, &toml_config)?; - // create device manager let dev_manager = DeviceManager::new(hypervisor.clone()).context("failed to create device manager")?; + let cgroups_resource = CgroupsResource::new(sid, &toml_config)?; + let cpu_resource = CpuResource::new(toml_config.clone())?; Ok(Self { sid: sid.to_string(), toml_config, @@ -72,6 +73,7 @@ impl ResourceManagerInner { rootfs_resource: RootFsResource::new(), volume_resource: VolumeResource::new(), cgroups_resource, + cpu_resource, }) } @@ -316,16 +318,6 @@ impl ResourceManagerInner { } } - pub async fn update_cgroups( - &self, - cid: &str, - linux_resources: Option<&LinuxResources>, - ) -> Result<()> { - self.cgroups_resource - .update_cgroups(cid, linux_resources, self.hypervisor.as_ref()) - .await - } - pub async fn cleanup(&self) -> Result<()> { // clean up cgroup self.cgroups_resource @@ -354,6 +346,57 @@ impl ResourceManagerInner { self.rootfs_resource.dump().await; self.volume_resource.dump().await; } + + pub async fn update_linux_resource( + &self, + cid: &str, + linux_resources: Option<&LinuxResources>, + op: ResourceUpdateOp, + ) -> Result> { + let linux_cpus = || -> Option<&LinuxCpu> { linux_resources.as_ref()?.cpu.as_ref() }(); + + // if static_sandbox_resource_mgmt, we will not have to update sandbox's cpu or mem resource + if !self.toml_config.runtime.static_sandbox_resource_mgmt { + self.cpu_resource + .update_cpu_resources( + cid, + linux_cpus, + op, + self.hypervisor.as_ref(), + self.agent.as_ref(), + ) + .await?; + } + + // we should firstly update the vcpus and mems, and then update the host cgroups + self.cgroups_resource + .update_cgroups(cid, linux_resources, op, self.hypervisor.as_ref()) + .await?; + + // update the linux resources for agent + self.agent_linux_resources(linux_resources) + } + + fn agent_linux_resources( + &self, + linux_resources: Option<&LinuxResources>, + ) -> Result> { + let mut resources = match linux_resources { + Some(linux_resources) => linux_resources.clone(), + None => { + return Ok(None); + } + }; + + // clear the cpuset + // for example, if there are only 5 vcpus now, and the cpuset in LinuxResources is 0-2,6, guest os will report + // error when creating the container. so we choose to clear the cpuset here. + if let Some(cpu) = &mut resources.cpu { + cpu.cpus = String::new(); + } + + Ok(Some(resources)) + } } #[async_trait] @@ -400,6 +443,7 @@ impl Persist for ResourceManagerInner { ) .await?, toml_config: Arc::new(TomlConfig::default()), + cpu_resource: CpuResource::default(), }) } } diff --git a/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs b/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs index c12df38b12..80e4149c3b 100644 --- a/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs +++ b/src/runtime-rs/crates/runtimes/common/src/runtime_handler.rs @@ -3,6 +3,7 @@ // // SPDX-License-Identifier: Apache-2.0 // + use std::sync::Arc; use anyhow::Result; diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs index efe06fa439..1a79f23d67 100644 --- a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -26,12 +26,10 @@ pub trait Sandbox: Send + Sync { async fn cleanup(&self) -> Result<()>; async fn shutdown(&self) -> Result<()>; - // agent function - async fn agent_sock(&self) -> Result; - // utils async fn set_iptables(&self, is_ipv6: bool, data: Vec) -> Result>; async fn get_iptables(&self, is_ipv6: bool) -> Result>; async fn direct_volume_stats(&self, volume_path: &str) -> Result; async fn direct_volume_resize(&self, resize_req: agent::ResizeVolumeRequest) -> Result<()>; + async fn agent_sock(&self) -> Result; } diff --git a/src/runtime-rs/crates/runtimes/src/lib.rs b/src/runtime-rs/crates/runtimes/src/lib.rs index dffa69697f..867c8ef9e6 100644 --- a/src/runtime-rs/crates/runtimes/src/lib.rs +++ b/src/runtime-rs/crates/runtimes/src/lib.rs @@ -13,4 +13,3 @@ pub mod manager; pub use manager::RuntimeHandlerManager; pub use shim_interface; mod shim_mgmt; -mod static_resource; diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index 1fa7ce4a08..d05e1961f9 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -6,7 +6,6 @@ use std::{path::PathBuf, str::from_utf8, sync::Arc}; -use crate::{shim_mgmt::server::MgmtServer, static_resource::StaticResourceManager}; use anyhow::{anyhow, Context, Result}; use common::{ message::Message, @@ -18,12 +17,11 @@ use kata_sys_util::spec::load_oci_spec; use kata_types::{ annotations::Annotation, config::default::DEFAULT_GUEST_DNS_FILE, config::TomlConfig, }; -use netns_rs::NetNs; -use resource::network::generate_netns_name; - #[cfg(feature = "linux")] use linux_container::LinuxContainer; +use netns_rs::NetNs; use persist::sandbox_persist::Persist; +use resource::{cpu_mem::initial_size::InitialSizeManager, network::generate_netns_name}; use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER; use tokio::fs; use tokio::sync::{mpsc::Sender, RwLock}; @@ -36,6 +34,8 @@ use virt_container::{ #[cfg(feature = "wasm")] use wasm_container::WasmContainer; +use crate::shim_mgmt::server::MgmtServer; + struct RuntimeHandlerManagerInner { id: String, msg_sender: Sender, @@ -422,14 +422,11 @@ fn load_config(spec: &oci::Spec, option: &Option>) -> Result // 2. If this is not a sandbox infrastructure container, but instead a standalone single container (analogous to "docker run..."), // then the container spec itself will contain appropriate sizing information for the entire sandbox (since it is // a single container. - if toml_config.runtime.static_sandbox_resource_mgmt { - info!(sl!(), "static resource management enabled"); - let static_resource_manager = StaticResourceManager::new(spec) - .context("failed to construct static resource manager")?; - static_resource_manager - .setup_config(&mut toml_config) - .context("failed to setup static resource mgmt config")?; - } + let initial_size_manager = + InitialSizeManager::new(spec).context("failed to construct static resource manager")?; + initial_size_manager + .setup_config(&mut toml_config) + .context("failed to setup static resource mgmt config")?; info!(sl!(), "get config content {:?}", &toml_config); Ok(toml_config) diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index 6dfea9fd0d..fba45a784b 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -19,7 +19,7 @@ use common::{ use kata_sys_util::k8s::update_ephemeral_storage_type; use oci::{LinuxResources, Process as OCIProcess}; -use resource::ResourceManager; +use resource::{ResourceManager, ResourceUpdateOp}; use tokio::sync::RwLock; use super::{ @@ -64,6 +64,10 @@ impl Container { config.stderr.clone(), config.terminal, ); + let linux_resources = spec + .linux + .as_ref() + .and_then(|linux| linux.resources.clone()); Ok(Self { pid, @@ -74,6 +78,7 @@ impl Container { agent.clone(), init_process, logger.clone(), + linux_resources, ))), agent, resource_manager, @@ -150,15 +155,18 @@ impl Container { .handler_devices(&config.container_id, linux) .await?; - // update cgroups - self.resource_manager - .update_cgroups( + // update vcpus, mems and host cgroups + let resources = self + .resource_manager + .update_linux_resource( &config.container_id, - spec.linux - .as_ref() - .and_then(|linux| linux.resources.as_ref()), + inner.linux_resources.as_ref(), + ResourceUpdateOp::Add, ) .await?; + if let Some(linux) = &mut spec.linux { + linux.resources = resources; + } // create container let r = agent::CreateContainerRequest { @@ -323,7 +331,20 @@ impl Container { inner .stop_process(container_process, true, &device_manager) .await - .context("stop process") + .context("stop process")?; + + // update vcpus, mems and host cgroups + if container_process.process_type == ProcessType::Container { + self.resource_manager + .update_linux_resource( + &self.config.container_id, + inner.linux_resources.as_ref(), + ResourceUpdateOp::Del, + ) + .await?; + } + + Ok(()) } pub async fn pause(&self) -> Result<()> { @@ -398,13 +419,21 @@ impl Container { } pub async fn update(&self, resources: &LinuxResources) -> Result<()> { - self.resource_manager - .update_cgroups(&self.config.container_id, Some(resources)) + let mut inner = self.inner.write().await; + inner.linux_resources = Some(resources.clone()); + // update vcpus, mems and host cgroups + let agent_resources = self + .resource_manager + .update_linux_resource( + &self.config.container_id, + Some(resources), + ResourceUpdateOp::Update, + ) .await?; let req = agent::UpdateContainerRequest { container_id: self.container_id.container_id.clone(), - resources: resources.clone(), + resources: agent_resources, mounts: Vec::new(), }; self.agent diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 12d4810fbb..bc478cbcdc 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -14,6 +14,7 @@ use common::{ }; use hypervisor::device::device_manager::DeviceManager; use nix::sys::signal::Signal; +use oci::LinuxResources; use resource::{rootfs::Rootfs, volume::Volume}; use tokio::sync::RwLock; @@ -32,10 +33,16 @@ pub struct ContainerInner { pub(crate) exec_processes: HashMap, pub(crate) rootfs: Vec>, pub(crate) volumes: Vec>, + pub(crate) linux_resources: Option, } impl ContainerInner { - pub(crate) fn new(agent: Arc, init_process: Process, logger: slog::Logger) -> Self { + pub(crate) fn new( + agent: Arc, + init_process: Process, + logger: slog::Logger, + linux_resources: Option, + ) -> Self { Self { agent, logger, @@ -43,6 +50,7 @@ impl ContainerInner { exec_processes: HashMap::new(), rootfs: vec![], volumes: vec![], + linux_resources, } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs index f85e7901a3..f5aa05e6c6 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; + use std::{collections::HashMap, sync::Arc}; use agent::Agent; @@ -96,7 +97,6 @@ impl ContainerManager for VirtContainerManager { let mut containers = self.containers.write().await; container.create(spec).await.context("create")?; containers.insert(container.container_id.to_string(), container); - Ok(PID { pid: self.pid }) } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs index 8202d854d6..3cbdba493b 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs @@ -361,7 +361,7 @@ impl Sandbox for VirtSandbox { .await .context("resource clean up")?; - // TODO: cleanup other snadbox resource + // TODO: cleanup other sandbox resource Ok(()) } diff --git a/src/runtime/virtcontainers/agent.go b/src/runtime/virtcontainers/agent.go index ddf11d9ce2..8f0aca26ee 100644 --- a/src/runtime/virtcontainers/agent.go +++ b/src/runtime/virtcontainers/agent.go @@ -10,6 +10,7 @@ import ( "time" "context" + persistapi "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/persist/api" pbTypes "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols" "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/grpc" @@ -119,7 +120,7 @@ type agent interface { // onlineCPUMem will online CPUs and Memory inside the Sandbox. // This function should be called after hot adding vCPUs or Memory. - // cpus specifies the number of CPUs that were added and the agent should online + // cpus specifies the number of CPUs that should be onlined in the guest, and special value 0 means agent will skip this check. // cpuOnly specifies that we should online cpu or online memory or both onlineCPUMem(ctx context.Context, cpus uint32, cpuOnly bool) error diff --git a/src/runtime/virtcontainers/pkg/agent/protocols/grpc/agent.pb.go b/src/runtime/virtcontainers/pkg/agent/protocols/grpc/agent.pb.go index a8dd81104a..3a1b7c607f 100644 --- a/src/runtime/virtcontainers/pkg/agent/protocols/grpc/agent.pb.go +++ b/src/runtime/virtcontainers/pkg/agent/protocols/grpc/agent.pb.go @@ -1924,7 +1924,8 @@ type OnlineCPUMemRequest struct { // If true the agent returns once all resources have been connected, otherwise all // resources are connected asynchronously and the agent returns immediately. Wait bool `protobuf:"varint,1,opt,name=wait,proto3" json:"wait,omitempty"` - // NbCpus specifies the number of CPUs that were added and the agent has to online. + // NbCpus specifies the number of CPUs that should be onlined in the guest. + // Special value 0 means agent will skip this check. NbCpus uint32 `protobuf:"varint,2,opt,name=nb_cpus,json=nbCpus,proto3" json:"nb_cpus,omitempty"` // CpuOnly specifies whether only online CPU or not. CpuOnly bool `protobuf:"varint,3,opt,name=cpu_only,json=cpuOnly,proto3" json:"cpu_only,omitempty"` diff --git a/src/runtime/virtcontainers/sandbox.go b/src/runtime/virtcontainers/sandbox.go index b0697fd840..9bd5f402eb 100644 --- a/src/runtime/virtcontainers/sandbox.go +++ b/src/runtime/virtcontainers/sandbox.go @@ -2117,9 +2117,8 @@ func (s *Sandbox) updateResources(ctx context.Context) error { s.Logger().Debugf("Request to hypervisor to update oldCPUs/newCPUs: %d/%d", oldCPUs, newCPUs) // If the CPUs were increased, ask agent to online them if oldCPUs < newCPUs { - vcpusAdded := newCPUs - oldCPUs - s.Logger().Debugf("Request to onlineCPUMem with %d CPUs", vcpusAdded) - if err := s.agent.onlineCPUMem(ctx, vcpusAdded, true); err != nil { + s.Logger().Debugf("Request to onlineCPUMem with %d CPUs", newCPUs) + if err := s.agent.onlineCPUMem(ctx, newCPUs, true); err != nil { return err } } diff --git a/src/runtime/virtcontainers/vm.go b/src/runtime/virtcontainers/vm.go index b5dec99122..72afa9581c 100644 --- a/src/runtime/virtcontainers/vm.go +++ b/src/runtime/virtcontainers/vm.go @@ -293,7 +293,7 @@ func (v *VM) AddMemory(ctx context.Context, numMB uint32) error { // OnlineCPUMemory puts the hotplugged CPU and memory online. func (v *VM) OnlineCPUMemory(ctx context.Context) error { v.logger().Infof("online CPU %d and memory", v.cpuDelta) - err := v.agent.onlineCPUMem(ctx, v.cpuDelta, false) + err := v.agent.onlineCPUMem(ctx, v.cpu, false) if err == nil { v.cpuDelta = 0 }