runtime-rs: Add vCPU thread pinning support

Port the Go runtime's enable_vcpus_pinning feature to runtime-rs.

The Go runtime already lets users pin each vCPU thread to a specific
host CPU when the vCPU count matches the sandbox cpuset size, using
sched_setaffinity. This is useful for latency-sensitive workloads that
benefit from eliminating cross-CPU migration of vCPU threads.

The approach mirrors the Go implementation:

After VM start and on every container add/update/delete, we fetch the
vCPU thread IDs (via QMP query-cpus-fast for QEMU), compute the union of
all containers' OCI cpusets, and if the two counts match, pin vCPU i to
cpuset[i]. If they diverge (hotplug, container removal, etc.) we reset
all threads back to the full cpuset so nothing gets stuck on a single
core.

The pinning check lives in CgroupsResourceInner::update_sandbox_cgroups,
which already runs at exactly the right points in the lifecycle. The
enable_vcpus_pinning flag flows from the TOML config through
CgroupConfig into the cgroup resource layer, and can also be overridden
per-pod via the io.katacontainers.config.runtime.enable_vcpus_pinning
annotation.

The QEMU config templates default to false. The NV GPU configs will get
their own default (true) in a follow-up once those templates are added.

Signed-off-by: Fabiano Fidêncio <ffidencio@nvidia.com>
Made-with: Cursor
This commit is contained in:
Fabiano Fidêncio
2026-04-13 11:10:38 +02:00
parent 1c2d5cb57d
commit 48669a894e
15 changed files with 323 additions and 4 deletions

1
Cargo.lock generated
View File

@@ -6030,6 +6030,7 @@ dependencies = [
"oci-spec 0.8.4",
"persist",
"rand 0.10.1",
"rstest",
"rtnetlink 0.19.0",
"scopeguard",
"serde",

View File

@@ -336,6 +336,9 @@ pub const KATA_ANNO_CFG_HYPERVISOR_NETWORK_QUEUES: &str =
/// SandboxCgroupOnly is a sandbox annotation that determines if kata processes are managed only in sandbox cgroup.
pub const KATA_ANNO_CFG_SANDBOX_CGROUP_ONLY: &str =
"io.katacontainers.config.runtime.sandbox_cgroup_only";
/// A sandbox annotation that controls pinning of vCPU threads to host CPUs.
pub const KATA_ANNO_CFG_ENABLE_VCPUS_PINNING: &str =
"io.katacontainers.config.runtime.enable_vcpus_pinning";
/// A sandbox annotation that determines if create a netns for hypervisor process.
pub const KATA_ANNO_CFG_DISABLE_NEW_NETNS: &str =
"io.katacontainers.config.runtime.disable_new_netns";
@@ -1148,6 +1151,14 @@ impl Annotation {
return Err(bool_err);
}
},
KATA_ANNO_CFG_ENABLE_VCPUS_PINNING => match self.get_value::<bool>(key) {
Ok(r) => {
config.runtime.enable_vcpus_pinning = r.unwrap_or_default();
}
Err(_e) => {
return Err(bool_err);
}
},
KATA_ANNO_CFG_DISABLE_NEW_NETNS => match self.get_value::<bool>(key) {
Ok(r) => {
config.runtime.disable_new_netns = r.unwrap_or_default();

View File

@@ -100,6 +100,15 @@ pub struct Runtime {
#[serde(default)]
pub sandbox_cgroup_only: bool,
/// If enabled, each vCPU thread will be pinned to a fixed host CPU.
///
/// Pinning is only applied when the number of vCPU threads equals
/// the number of CPUs in the sandbox's cpuset. When the counts
/// diverge (e.g. after hotplug or container removal), pinning is
/// reset so all vCPU threads float across the full cpuset.
#[serde(default)]
pub enable_vcpus_pinning: bool,
/// If enabled, the runtime will create opentracing.io traces and spans.
/// See https://www.jaegertracing.io/docs/getting-started.
#[serde(default)]
@@ -342,6 +351,7 @@ internetworking_model = "macvtap"
disable_new_netns = true
sandbox_bind_mounts = []
sandbox_cgroup_only = true
enable_vcpus_pinning = true
enable_tracing = true
jaeger_endpoint = "localhost:1234"
jaeger_user = "user"
@@ -362,6 +372,7 @@ field_should_be_ignored = true
assert!(config.runtime.disable_new_netns);
assert_eq!(config.runtime.sandbox_bind_mounts.len(), 0);
assert!(config.runtime.sandbox_cgroup_only);
assert!(config.runtime.enable_vcpus_pinning);
assert!(config.runtime.enable_tracing);
assert!(config.runtime.is_experiment_enabled("a"));
assert!(config.runtime.is_experiment_enabled("b"));

View File

@@ -379,6 +379,7 @@ ifneq (,$(QEMUCMD))
# qemu-specific options
DEFSANDBOXCGROUPONLY_QEMU := false
DEFENABLEVCPUSPINNING_QEMU := false
ifeq ($(ARCH), s390x)
VMROOTFSDRIVER_QEMU := virtio-blk-ccw
DEFBLOCKSTORAGEDRIVER_QEMU := virtio-blk-ccw
@@ -593,6 +594,7 @@ USER_VARS += DEFSANDBOXCGROUPONLY_DB
USER_VARS += DEFSANDBOXCGROUPONLY_FC
USER_VARS += DEFSANDBOXCGROUPONLY_CLH
USER_VARS += DEFSANDBOXCGROUPONLY_REMOTE
USER_VARS += DEFENABLEVCPUSPINNING_QEMU
USER_VARS += DEFSTATICRESOURCEMGMT_DB
USER_VARS += DEFSTATICRESOURCEMGMT_FC
USER_VARS += DEFSTATICRESOURCEMGMT_CLH

View File

@@ -706,6 +706,11 @@ agent_name = "@PROJECT_TYPE@"
# (default: true)
disable_guest_seccomp = @DEFDISABLEGUESTSECCOMP@
# vCPUs pinning settings
# if enabled, each vCPU thread will be scheduled to a fixed CPU
# qualified condition: num(vCPU threads) == num(CPUs in sandbox's CPUSet)
enable_vcpus_pinning = @DEFENABLEVCPUSPINNING_QEMU@
# If enabled, the runtime will create opentracing.io traces and spans.
# (See https://www.jaegertracing.io/docs/getting-started).
# (default: disabled)

View File

@@ -703,6 +703,11 @@ agent_name = "@PROJECT_TYPE@"
# (default: true)
disable_guest_seccomp = @DEFDISABLEGUESTSECCOMP@
# vCPUs pinning settings
# if enabled, each vCPU thread will be scheduled to a fixed CPU
# qualified condition: num(vCPU threads) == num(CPUs in sandbox's CPUSet)
enable_vcpus_pinning = @DEFENABLEVCPUSPINNING_QEMU@
# If enabled, the runtime will create opentracing.io traces and spans.
# (See https://www.jaegertracing.io/docs/getting-started).
# (default: disabled)

View File

@@ -586,6 +586,11 @@ agent_name = "@PROJECT_TYPE@"
# (default: true)
disable_guest_seccomp = @DEFDISABLEGUESTSECCOMP@
# vCPUs pinning settings
# if enabled, each vCPU thread will be scheduled to a fixed CPU
# qualified condition: num(vCPU threads) == num(CPUs in sandbox's CPUSet)
enable_vcpus_pinning = @DEFENABLEVCPUSPINNING_QEMU@
# If enabled, the runtime will create opentracing.io traces and spans.
# (See https://www.jaegertracing.io/docs/getting-started).
# (default: disabled)

View File

@@ -628,6 +628,11 @@ agent_name="@PROJECT_TYPE@"
# (default: true)
disable_guest_seccomp = @DEFDISABLEGUESTSECCOMP@
# vCPUs pinning settings
# if enabled, each vCPU thread will be scheduled to a fixed CPU
# qualified condition: num(vCPU threads) == num(CPUs in sandbox's CPUSet)
enable_vcpus_pinning = @DEFENABLEVCPUSPINNING_QEMU@
# If enabled, the runtime will create opentracing.io traces and spans.
# (See https://www.jaegertracing.io/docs/getting-started).
# (default: disabled)

View File

@@ -604,6 +604,11 @@ agent_name="@PROJECT_TYPE@"
# (default: true)
disable_guest_seccomp = @DEFDISABLEGUESTSECCOMP@
# vCPUs pinning settings
# if enabled, each vCPU thread will be scheduled to a fixed CPU
# qualified condition: num(vCPU threads) == num(CPUs in sandbox's CPUSet)
enable_vcpus_pinning = @DEFENABLEVCPUSPINNING_QEMU@
# If enabled, the runtime will create opentracing.io traces and spans.
# (See https://www.jaegertracing.io/docs/getting-started).
# (default: disabled)

View File

@@ -6,6 +6,7 @@ edition = { workspace = true }
license = { workspace = true }
[dev-dependencies]
rstest = { workspace = true }
tempfile = { workspace = true }
# Local dev-dependencies

View File

@@ -6,8 +6,10 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Default)]
#[serde(default)]
pub struct CgroupState {
pub path: Option<String>,
pub overhead_path: Option<String>,
pub sandbox_cgroup_only: bool,
pub enable_vcpus_pinning: bool,
}

View File

@@ -29,6 +29,7 @@ pub struct CgroupConfig {
pub path: String,
pub overhead_path: String,
pub sandbox_cgroup_only: bool,
pub enable_vcpus_pinning: bool,
}
impl CgroupConfig {
@@ -59,10 +60,13 @@ impl CgroupConfig {
toml_config.runtime.sandbox_cgroup_only
};
let enable_vcpus_pinning = toml_config.runtime.enable_vcpus_pinning;
Ok(Self {
path,
overhead_path,
sandbox_cgroup_only,
enable_vcpus_pinning,
})
}
@@ -80,6 +84,7 @@ impl CgroupConfig {
path: path.clone(),
overhead_path: overhead_path.clone(),
sandbox_cgroup_only: state.sandbox_cgroup_only,
enable_vcpus_pinning: state.enable_vcpus_pinning,
})
}
}

View File

@@ -78,6 +78,7 @@ impl Persist for CgroupsResource {
path: Some(self.cgroup_config.path.clone()),
overhead_path: Some(self.cgroup_config.overhead_path.clone()),
sandbox_cgroup_only: self.cgroup_config.sandbox_cgroup_only,
enable_vcpus_pinning: self.cgroup_config.enable_vcpus_pinning,
})
}

View File

@@ -6,11 +6,15 @@
use std::collections::{HashMap, HashSet};
use std::process;
use std::str::FromStr;
use anyhow::{anyhow, Context, Result};
use cgroups_rs::manager::is_systemd_cgroup;
use cgroups_rs::{CgroupPid, FsManager, Manager, SystemdManager};
use hypervisor::Hypervisor;
use hypervisor::{Hypervisor, VcpuThreadIds};
use kata_types::cpu::CpuSet;
use nix::sched::{sched_setaffinity, CpuSet as NixCpuSet};
use nix::unistd::Pid;
use oci_spec::runtime::{LinuxCpu, LinuxCpuBuilder, LinuxResources, LinuxResourcesBuilder};
use crate::cgroups::utils::get_tgid_from_pid;
@@ -24,6 +28,14 @@ pub(crate) struct CgroupsResourceInner {
resources: HashMap<String, LinuxResources>,
sandbox_cgroup: CgroupManager,
overhead_cgroup: Option<CgroupManager>,
/// User-facing config knob: whether vCPU pinning is allowed at all.
/// Comes from TOML `enable_vcpus_pinning` or the per-pod annotation.
enable_vcpus_pinning: bool,
/// Runtime state: whether pinning is currently active. Pinning is only
/// turned on when `enable_vcpus_pinning` is true *and* the vCPU count
/// matches the sandbox cpuset size. Tracked so we know when to reset
/// threads back to the full cpuset after a mismatch.
is_vcpus_pinning_on: bool,
}
impl CgroupsResourceInner {
@@ -89,6 +101,8 @@ impl CgroupsResourceInner {
resources: HashMap::new(),
sandbox_cgroup,
overhead_cgroup,
enable_vcpus_pinning: config.enable_vcpus_pinning,
is_vcpus_pinning_on: false,
})
}
@@ -100,6 +114,8 @@ impl CgroupsResourceInner {
resources: HashMap::new(),
sandbox_cgroup,
overhead_cgroup,
enable_vcpus_pinning: config.enable_vcpus_pinning,
is_vcpus_pinning_on: false,
})
}
}
@@ -140,8 +156,10 @@ impl CgroupsResourceInner {
Ok(resources)
}
async fn move_vcpus_to_sandbox_cgroup(&mut self, hypervisor: &dyn Hypervisor) -> Result<usize> {
let hv_pids = hypervisor.get_thread_ids().await?;
async fn move_vcpus_to_sandbox_cgroup(
&mut self,
hv_pids: &VcpuThreadIds,
) -> Result<usize> {
let mut pids = hv_pids.vcpus.values();
// Use threaded mode only in cgroup v1 + cgroupfs
@@ -169,6 +187,20 @@ impl CgroupsResourceInner {
}
async fn update_sandbox_cgroups(&mut self, hypervisor: &dyn Hypervisor) -> Result<bool> {
let needs_thread_ids =
self.overhead_cgroup.is_some() || self.enable_vcpus_pinning;
let thread_ids = if needs_thread_ids {
Some(
hypervisor
.get_thread_ids()
.await
.context("get vCPU thread IDs")?,
)
} else {
None
};
// The runtime is under overhead cgroup if available. The
// hypervisor as a child of the runtime is under the overhead
// cgroup by default. We should move VMM process/vCPU threads to
@@ -176,7 +208,7 @@ impl CgroupsResourceInner {
// resources.
if self.overhead_cgroup.is_some() {
let vcpu_num = self
.move_vcpus_to_sandbox_cgroup(hypervisor)
.move_vcpus_to_sandbox_cgroup(thread_ids.as_ref().unwrap())
.await
.context("move vcpus to sandbox cgroup")?;
// The cgroup managers will not create cgroups if no processes
@@ -191,8 +223,114 @@ impl CgroupsResourceInner {
let sandbox_resources = self.collect_resources().context("collect resources")?;
self.sandbox_cgroup.set(&sandbox_resources).context("set")?;
if let Some(thread_ids) = thread_ids {
self.check_vcpus_pinning(thread_ids)
.context("check vCPUs pinning")?;
}
Ok(true)
}
fn collect_sandbox_cpuset(&self) -> CpuSet {
let mut cpuset = CpuSet::new();
for res in self.resources.values() {
let Some(cpus_str) = res.cpu().as_ref().and_then(|c| c.cpus().as_deref()) else {
continue;
};
match CpuSet::from_str(cpus_str) {
Ok(parsed) => cpuset.extend(&parsed),
Err(e) => warn!(
sl!(),
"vCPU pinning: failed to parse cpuset \"{}\": {}", cpus_str, e
),
}
}
cpuset
}
fn set_thread_affinity(tid: u32, cpus: &[u32]) -> Result<()> {
let nix_cpuset = build_nix_cpuset(cpus)?;
sched_setaffinity(Pid::from_raw(tid as i32), &nix_cpuset).map_err(|e| {
anyhow!(
"sched_setaffinity failed for thread {} to cpus {:?}: {}",
tid,
cpus,
e
)
})
}
fn check_vcpus_pinning(&mut self, thread_ids: VcpuThreadIds) -> Result<()> {
if !self.enable_vcpus_pinning {
return Ok(());
}
let cpuset = self.collect_sandbox_cpuset();
let cpuset_slice: Vec<u32> = cpuset.iter().copied().collect();
let num_vcpus = thread_ids.vcpus.len();
let num_cpus = cpuset_slice.len();
if num_vcpus == 0 || num_cpus == 0 || num_vcpus != num_cpus {
if num_vcpus == 0 {
info!(sl!(), "vCPU pinning: no vCPU threads found, skipping");
} else if num_cpus == 0 {
info!(sl!(), "vCPU pinning: no cpuset configured, skipping");
} else {
info!(
sl!(),
"vCPU pinning: vCPU count ({}) != cpuset size ({}), pinning not possible",
num_vcpus,
num_cpus
);
}
if self.is_vcpus_pinning_on && num_vcpus > 0 {
info!(sl!(), "vCPU pinning: resetting previous pinning");
self.reset_vcpus_pinning(&thread_ids.vcpus, &cpuset_slice)?;
self.is_vcpus_pinning_on = false;
}
return Ok(());
}
// Pin vCPU i to cpuset_slice[i] (both sorted by index)
let mut sorted_vcpus: Vec<(u32, u32)> = thread_ids.vcpus.into_iter().collect();
sorted_vcpus.sort_by_key(|(idx, _)| *idx);
for (i, (_vcpu_idx, tid)) in sorted_vcpus.iter().enumerate() {
if let Err(e) = Self::set_thread_affinity(*tid, &cpuset_slice[i..i + 1]) {
// On failure, reset all pinning and propagate the error
let all_vcpus: HashMap<u32, u32> = sorted_vcpus.iter().copied().collect();
let _ = self.reset_vcpus_pinning(&all_vcpus, &cpuset_slice);
return Err(e).context(format!(
"failed to pin vCPU thread {} to CPU {}",
tid, cpuset_slice[i]
));
}
}
self.is_vcpus_pinning_on = true;
info!(
sl!(),
"vCPU pinning: pinned {} vCPU threads to cpuset {:?}", num_vcpus, cpuset_slice
);
Ok(())
}
fn reset_vcpus_pinning(
&self,
vcpus: &HashMap<u32, u32>,
cpuset_slice: &[u32],
) -> Result<()> {
if cpuset_slice.is_empty() {
return Ok(());
}
for tid in vcpus.values() {
Self::set_thread_affinity(*tid, cpuset_slice).with_context(|| {
format!("failed to reset vCPU thread {} affinity", tid)
})?;
}
Ok(())
}
}
impl CgroupsResourceInner {
@@ -276,6 +414,17 @@ impl CgroupsResourceInner {
}
}
/// Build a `NixCpuSet` from a slice of CPU ids.
fn build_nix_cpuset(cpus: &[u32]) -> Result<NixCpuSet> {
let mut nix_cpuset = NixCpuSet::new();
for cpu_id in cpus {
nix_cpuset
.set(*cpu_id as usize)
.map_err(|e| anyhow!("failed to set CPU {} in cpuset: {}", cpu_id, e))?;
}
Ok(nix_cpuset)
}
/// Copy cpu.cpus and cpu.mems from the given resources to new resources.
fn new_cpuset_resources(resources: &LinuxResources) -> Result<LinuxResources> {
let cpu = resources.cpu();
@@ -296,3 +445,113 @@ fn new_cpuset_resources(resources: &LinuxResources) -> Result<LinuxResources> {
Ok(resources)
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
fn make_resources_with_cpus(cpus: &str) -> LinuxResources {
let cpu = LinuxCpuBuilder::default()
.cpus(cpus.to_string())
.build()
.unwrap();
LinuxResourcesBuilder::default()
.cpu(cpu)
.build()
.unwrap()
}
fn make_inner_for_test(enable_pinning: bool) -> CgroupsResourceInner {
CgroupsResourceInner {
resources: HashMap::new(),
sandbox_cgroup: Box::new(
FsManager::new("test_sandbox_cgroup_pinning").unwrap(),
),
overhead_cgroup: None,
enable_vcpus_pinning: enable_pinning,
is_vcpus_pinning_on: false,
}
}
#[rstest]
#[case::empty(vec![], vec![])]
#[case::single_range(vec![("c1", "0-3")], vec![0, 1, 2, 3])]
#[case::single_list(vec![("c1", "0,2,4")], vec![0, 2, 4])]
#[case::multi_container_disjoint(
vec![("c1", "0,1"), ("c2", "2,3")],
vec![0, 1, 2, 3]
)]
#[case::multi_container_overlapping(
vec![("c1", "0-2"), ("c2", "1-3")],
vec![0, 1, 2, 3]
)]
#[case::three_containers(
vec![("c1", "0"), ("c2", "4-6"), ("c3", "2")],
vec![0, 2, 4, 5, 6]
)]
fn test_collect_sandbox_cpuset(
#[case] containers: Vec<(&str, &str)>,
#[case] expected: Vec<u32>,
) {
let mut inner = make_inner_for_test(true);
for (cid, cpus) in containers {
inner
.resources
.insert(cid.to_string(), make_resources_with_cpus(cpus));
}
let cpuset = inner.collect_sandbox_cpuset();
let cpus: Vec<u32> = cpuset.iter().copied().collect();
assert_eq!(cpus, expected);
}
#[test]
fn test_collect_sandbox_cpuset_no_cpu_field() {
let mut inner = make_inner_for_test(true);
let resources = LinuxResourcesBuilder::default().build().unwrap();
inner.resources.insert("c1".to_string(), resources);
let cpuset = inner.collect_sandbox_cpuset();
assert!(cpuset.is_empty());
}
#[rstest]
#[case::specific_cpus(&[0, 2, 4], &[0, 2, 4], &[1, 3])]
#[case::contiguous(&[0, 1, 2, 3], &[0, 1, 2, 3], &[4, 5])]
#[case::single(&[7], &[7], &[0, 6])]
fn test_build_nix_cpuset(
#[case] input: &[u32],
#[case] expected_set: &[u32],
#[case] expected_unset: &[u32],
) {
let cpuset = build_nix_cpuset(input).unwrap();
for cpu in expected_set {
assert!(
cpuset.is_set(*cpu as usize).unwrap(),
"CPU {} should be set",
cpu
);
}
for cpu in expected_unset {
assert!(
!cpuset.is_set(*cpu as usize).unwrap(),
"CPU {} should not be set",
cpu
);
}
}
#[test]
fn test_build_nix_cpuset_empty() {
let cpuset = build_nix_cpuset(&[]).unwrap();
assert!(!cpuset.is_set(0).unwrap());
}
#[rstest]
#[case::disabled(false, false)]
#[case::enabled(true, false)]
fn test_pinning_initial_state(#[case] enable: bool, #[case] expected_on: bool) {
let inner = make_inner_for_test(enable);
assert_eq!(inner.enable_vcpus_pinning, enable);
assert_eq!(inner.is_vcpus_pinning_on, expected_on);
}
}

View File

@@ -73,6 +73,7 @@ container_pipe_size = 2
enable_debug = true
internetworking_model="macvtap"
disable_guest_seccomp=false
enable_vcpus_pinning = false
enable_tracing = true
jaeger_endpoint = "localhost:1234"
jaeger_user = "user"