agent-ctl: Add option --vm to boot pod VM for testing.

This change introduces a new command line option `--vm`
to boot up a pod VM for testing. The tool connects with
kata agent running inside the VM to send the test commands.
The tool uses `hypervisor` crates from runtime-rs for VM
lifecycle management. Current implementation supports
Qemu & Cloud Hypervisor as VMMs.

In summary:
- tool parses the VMM specific runtime-rs kata config file in
/opt/kata/share/defaults/kata-containers/runtime-rs/*
- prepares and starts a VM using runtime-rs::hypervisor vm APIs
- retrieves agent's server address to setup connection
- tests the requested commands & shutdown the VM

Fixes #11566

Signed-off-by: Sumedh Alok Sharma <sumsharma@microsoft.com>
This commit is contained in:
Sumedh Alok Sharma 2025-07-10 17:43:14 +00:00 committed by stevenhorsman
parent e451e3dcd0
commit 0398073c55
10 changed files with 1022 additions and 146 deletions

File diff suppressed because it is too large Load Diff

View File

@ -30,7 +30,7 @@ rand = "0.8.4"
protobuf = "3.2.0"
log = "0.4.22"
nix = "0.23.0"
nix = "0.24.2"
libc = "0.2.112"
# XXX: Must be the same as the version used by the agent
ttrpc = "0.8.4"
@ -49,6 +49,11 @@ image-rs = { git = "https://github.com/confidential-containers/guest-components"
"signature-cosign-rustls",
] }
kata-types = { path = "../../libs/kata-types" }
# hypervisor crate from runtime-rs
hypervisor = { path = "../../runtime-rs/crates/hypervisor", features = ["cloud-hypervisor"]}
safe-path = { path = "../../libs/safe-path" }
tokio = { version = "1.44.2", features = ["signal"] }

View File

@ -7,14 +7,15 @@
use crate::types::*;
use crate::utils;
use crate::vm;
use anyhow::{anyhow, Result};
use byteorder::ByteOrder;
use nix::sys::socket::{connect, socket, AddressFamily, SockAddr, SockFlag, SockType, UnixAddr};
use nix::sys::socket::{connect, socket, AddressFamily, SockFlag, SockType, UnixAddr, VsockAddr};
use protocols::agent::*;
use protocols::agent_ttrpc::*;
use protocols::health::*;
use protocols::health_ttrpc::*;
use slog::{debug, info};
use slog::{debug, info, warn};
use std::convert::TryFrom;
use std::fs;
use std::io::Write; // XXX: for flush()
@ -107,6 +108,12 @@ const METADATA_CFG_NS: &str = "agent-ctl-cfg";
// automatically.
const AUTO_VALUES_CFG_NAME: &str = "auto-values";
// Retry count and dial timeout to try connecting to the agent
// Static value taken from runtime-rs configuration calculation
// # Retry times = reconnect_timeout_ms / dial_timeout_ms (default: 300)
const RETRY_AGENT_CONNECT: u64 = 300;
const DIAL_TIMEOUT: u64 = 10;
static AGENT_CMDS: &[AgentCmd] = &[
AgentCmd {
name: "AddARPNeighbors",
@ -403,25 +410,43 @@ fn get_builtin_cmd_func(name: &str) -> Result<BuiltinCmdFp> {
}
fn client_create_vsock_fd(cid: libc::c_uint, port: u32) -> Result<RawFd> {
let fd = socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)
.map_err(|e| anyhow!(e))?;
let sock_addr = VsockAddr::new(cid, port);
let sock_addr = SockAddr::new_vsock(cid, port);
for i in 0..RETRY_AGENT_CONNECT {
let fd = socket(
AddressFamily::Vsock,
SockType::Stream,
SockFlag::SOCK_CLOEXEC,
None,
)
.map_err(|e| anyhow!(e))?;
connect(fd, &sock_addr).map_err(|e| anyhow!(e))?;
// Connect the socket to vsock server.
match connect(fd, &sock_addr) {
Ok(_) => return Ok(fd),
Err(e) => {
debug!(
sl!(),
"Failed to connect to vsock in attempt:{} error:{:?}", i, e
);
sleep(Duration::from_millis(DIAL_TIMEOUT));
continue;
}
}
}
Ok(fd)
Err(anyhow!("Failed to establish vsock connection with agent"))
}
// Setup the existing stream by making a Hybrid VSOCK host-initiated
// connection request to the Hybrid VSOCK-capable hypervisor (CLH or FC),
// asking it to route the connection to the Kata Agent running inside the VM.
fn setup_hybrid_vsock(mut stream: &UnixStream, hybrid_vsock_port: u64) -> Result<()> {
fn setup_hybrid_vsock(path: &str, hybrid_vsock_port: u64) -> Result<UnixStream> {
debug!(
sl!(),
"setup_hybrid_vsock path:{} port: {}", path, hybrid_vsock_port
);
// Challenge message sent to the Hybrid VSOCK capable hypervisor asking
// for a connection to a real VSOCK server running in the VM on the
// port specified as part of this message.
@ -431,39 +456,39 @@ fn setup_hybrid_vsock(mut stream: &UnixStream, hybrid_vsock_port: u64) -> Result
// hypervisor informing the client that the CONNECT_CMD was successful.
const OK_CMD: &str = "OK";
// Contact the agent by dialing it's port number and
// waiting for the hybrid vsock hypervisor to route the call for us ;)
//
// See: https://github.com/firecracker-microvm/firecracker/blob/main/docs/vsock.md#host-initiated-connections
let msg = format!("{} {}\n", CONNECT_CMD, hybrid_vsock_port);
for i in 0..RETRY_AGENT_CONNECT {
let mut stream = UnixStream::connect(path)?;
// Contact the agent by dialing it's port number and
// waiting for the hybrid vsock hypervisor to route the call for us ;)
//
// See: https://github.com/firecracker-microvm/firecracker/blob/main/docs/vsock.md#host-initiated-connections
let msg = format!("{} {}\n", CONNECT_CMD, hybrid_vsock_port);
stream.write_all(msg.as_bytes())?;
stream.write_all(msg.as_bytes())?;
// Now, see if we get the expected response
let mut reader = BufReader::new(&mut stream);
// Now, see if we get the expected response
let stream_reader = stream.try_clone()?;
let mut reader = BufReader::new(&stream_reader);
let mut msg = String::new();
reader.read_line(&mut msg)?;
let mut msg = String::new();
reader.read_line(&mut msg)?;
if msg.starts_with(OK_CMD) {
let response = msg
.strip_prefix(OK_CMD)
.ok_or(format!("invalid response: {:?}", msg))
.map_err(|e| anyhow!(e))?
.trim();
if msg.starts_with(OK_CMD) {
let response = msg
.strip_prefix(OK_CMD)
.ok_or(format!("invalid response: {:?}", msg))
.map_err(|e| anyhow!(e))?
.trim();
debug!(sl!(), "Hybrid VSOCK host-side port: {:?}", response);
} else {
return Err(anyhow!(
"failed to setup Hybrid VSOCK connection: response was: {:?}",
msg
));
// The Unix stream is now connected directly to the VSOCK socket
// the Kata agent is listening to in the VM.
debug!(sl!(), "Hybrid VSOCK host-side port: {:?}", response);
return Ok(stream);
} else {
debug!(sl!(), "attempt:{} message: {:?}", i, msg);
sleep(Duration::from_millis(DIAL_TIMEOUT));
continue;
}
}
// The Unix stream is now connected directly to the VSOCK socket
// the Kata agent is listening to in the VM.
Ok(())
Err(anyhow!("Failed to establish hvsock connection with agent"))
}
fn create_ttrpc_client(
@ -524,27 +549,21 @@ fn create_ttrpc_client(
}
};
let sock_addr = SockAddr::Unix(unix_addr);
connect(socket_fd, &sock_addr).map_err(|e| {
connect(socket_fd, &unix_addr).map_err(|e| {
anyhow!(e).context("Failed to connect to Unix Domain abstract socket")
})?;
socket_fd
} else if hybrid_vsock {
let stream = setup_hybrid_vsock(&path, hybrid_vsock_port)?;
stream.into_raw_fd()
} else {
let stream = match UnixStream::connect(path) {
Ok(s) => s,
Err(e) => {
return Err(
anyhow!(e).context("failed to create named UNIX Domain stream socket")
)
Err(err) => {
return Err(anyhow!("failed to setup unix stream: {:?}", err));
}
};
if hybrid_vsock {
setup_hybrid_vsock(&stream, hybrid_vsock_port)?
}
stream.into_raw_fd()
}
}
@ -605,7 +624,7 @@ fn announce(cfg: &Config) {
info!(sl!(), "announce"; "config" => format!("{:?}", cfg));
}
pub fn client(cfg: &Config, commands: Vec<&str>) -> Result<()> {
pub fn client(cfg: &mut Config, commands: Vec<&str>) -> Result<()> {
if commands.len() == 1 && commands[0].eq("list") {
println!("Built-in commands:\n");
@ -628,6 +647,68 @@ pub fn client(cfg: &Config, commands: Vec<&str>) -> Result<()> {
announce(cfg);
let vm_ref = handle_vm(cfg)?;
info!(sl!(), "run commands");
let result = run_commands(cfg, commands);
// stop the vm if booted
if vm_ref.is_some() {
info!(sl!(), "stopping test vm");
// TODO: The error handling here is for cloud-hypervisor.
// We use tokio::runtime to call the async operations of
// runtime-rs::crates::hypervisor::_vm
// These methods can spawn some additional functions, ex
// `clh::inner_hypervisor::cloud_hypervisor_log_output`
// But since we return from the tokio::runtime block
// the runtime is dropped. During stop_vm call, cloud hypervisor
// waits for the logger task which is in cancelled state as a result.
match vm::remove_vm(vm_ref.unwrap()) {
Ok(_) => info!(sl!(), "Successfully shut down test vm"),
Err(e) => warn!(sl!(), "Error shutting down vm:{:?}", e),
}
}
result.map_err(|e| anyhow!(e))
}
fn handle_vm(cfg: &mut Config) -> Result<Option<vm::TestVm>> {
info!(sl!(), "handle vm request");
// Return if no vm requested
if cfg.hypervisor_name.is_empty() {
return Ok(None);
}
// Boot the test vm
let vm_instance = vm::setup_vm(&cfg.hypervisor_name)?;
info!(
sl!(),
"booted test vm with hypervisor: {:?}", vm_instance.hypervisor_name
);
// set the vsock server address for connecting with ttrpc server
if !vm_instance.socket_addr.is_empty() {
match vm_instance.hybrid_vsock {
true => {
// hybrid vsock URI expects unix prefix
let addr_fields: Vec<&str> = vm_instance.socket_addr.split("://").collect();
cfg.server_address = format!("{}://{}", "unix", addr_fields[1]);
cfg.hybrid_vsock = true;
}
false => {
let addr = vm_instance.socket_addr.clone();
cfg.server_address = format!("{}:{}", addr, 1024);
cfg.hybrid_vsock = false;
}
}
}
info!(sl!(), "socket server addr: {}", cfg.server_address);
Ok(Some(vm_instance))
}
fn run_commands(cfg: &Config, commands: Vec<&str>) -> Result<()> {
// Create separate connections for each of the services provided
// by the agent.
let client = kata_service_agent(

View File

@ -25,6 +25,7 @@ mod image;
mod rpc;
mod types;
mod utils;
mod vm;
const DEFAULT_LOG_LEVEL: slog::Level = slog::Level::Info;
@ -73,6 +74,10 @@ fn make_examples_text(program_name: &str) -> String {
# Abstract socket
$ {program} connect --server-address "{abstract_server_address}" --cmd Check
- Boot up a test VM and connect to the agent (socket address determined by the tool):
$ {program} connect --vm qemu --cmd Check
- Query the agent environment:
$ {program} connect --server-address "{vsock_server_address}" --cmd GetGuestDetails
@ -140,12 +145,25 @@ fn connect(name: &str, global_args: clap::ArgMatches) -> Result<()> {
let interactive = args.contains_id("interactive");
let ignore_errors = args.contains_id("ignore-errors");
// boot-up a test vm for testing commands
let hypervisor_name = args
.get_one::<String>("vm")
.map(|s| s.as_str())
.unwrap_or_default()
.to_string();
let server_address = args
.get_one::<String>("server-address")
.map(|s| s.as_str())
.ok_or_else(|| anyhow!("need server adddress"))?
.unwrap_or_default()
.to_string();
// if vm is requested, we retrieve the server
// address after the boot-up is completed
if hypervisor_name.is_empty() && server_address.is_empty() {
return Err(anyhow!("need server address"));
}
let mut commands: Vec<&str> = Vec::new();
if !interactive {
@ -187,7 +205,7 @@ fn connect(name: &str, global_args: clap::ArgMatches) -> Result<()> {
let hybrid_vsock = args.contains_id("hybrid-vsock");
let no_auto_values = args.contains_id("no-auto-values");
let cfg = Config {
let mut cfg = Config {
server_address,
bundle_dir,
timeout_nano,
@ -196,9 +214,10 @@ fn connect(name: &str, global_args: clap::ArgMatches) -> Result<()> {
hybrid_vsock,
ignore_errors,
no_auto_values,
hypervisor_name,
};
let result = rpc::run(&logger, &cfg, commands);
let result = rpc::run(&logger, &mut cfg, commands);
result.map_err(|e| anyhow!(e))
}
@ -283,6 +302,12 @@ fn real_main() -> Result<()> {
.help("timeout value as nanoseconds or using human-readable suffixes (0 [forever], 99ns, 30us, 2ms, 5s, 7m, etc)")
.value_name("human-time"),
)
.arg(
Arg::new("vm")
.long("vm")
.help("boot a pod vm for testing")
.value_name("HYPERVISOR"),
)
)
.subcommand(
Command::new("generate-cid")

View File

@ -11,7 +11,7 @@ use slog::{o, Logger};
use crate::client::client;
use crate::types::Config;
pub fn run(logger: &Logger, cfg: &Config, commands: Vec<&str>) -> Result<()> {
pub fn run(logger: &Logger, cfg: &mut Config, commands: Vec<&str>) -> Result<()> {
// Maintain the global logger for the duration of the ttRPC comms
let _guard = slog_scope::set_global_logger(logger.new(o!("subsystem" => "rpc")));

View File

@ -19,6 +19,7 @@ pub struct Config {
pub hybrid_vsock: bool,
pub ignore_errors: bool,
pub no_auto_values: bool,
pub hypervisor_name: String,
}
// CopyFile input struct

View File

@ -0,0 +1,56 @@
// Copyright (c) 2024 Microsoft Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
// Description: Boot UVM for testing container storages/volumes.
use anyhow::{anyhow, Context, Result};
use hypervisor::Hypervisor;
use kata_types::config::{hypervisor::HYPERVISOR_NAME_CH, hypervisor::HYPERVISOR_NAME_QEMU};
use slog::info;
use std::sync::Arc;
mod vm_ops;
mod vm_utils;
lazy_static! {
pub(crate) static ref SUPPORTED_VMMS: Vec<&'static str> =
vec![HYPERVISOR_NAME_CH, HYPERVISOR_NAME_QEMU];
}
#[derive(Clone)]
pub struct TestVm {
pub hypervisor_name: String,
pub hypervisor_instance: Arc<dyn Hypervisor>,
pub socket_addr: String,
pub hybrid_vsock: bool,
}
// Helper method to boot a test pod VM
pub fn setup_vm(hypervisor_name: &str) -> Result<TestVm> {
info!(
sl!(),
"booting a pod vm using hypervisor:{:?}", hypervisor_name
);
if !SUPPORTED_VMMS.contains(&hypervisor_name) {
return Err(anyhow!("Unsupported hypervisor:{}", hypervisor_name));
}
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(vm_ops::boot_vm(hypervisor_name))
.context("booting the test vm")
}
// Helper method to stop a test pod VM
pub fn remove_vm(instance: TestVm) -> Result<()> {
info!(sl!(), "Stopping booted pod vm");
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(vm_ops::stop_vm(instance.hypervisor_instance))
.context("stopping the test vm")
}

View File

@ -0,0 +1,166 @@
// Copyright (c) 2024 Microsoft Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
// Description: Boot UVM for testing container storages/volumes.
use crate::vm::{vm_utils, TestVm};
use anyhow::{anyhow, Context, Result};
use hypervisor::{
ch::CloudHypervisor,
device::{
device_manager::{do_handle_device, DeviceManager},
DeviceConfig,
},
qemu::Qemu,
BlockConfig, Hypervisor, VsockConfig,
};
use kata_types::config::{
hypervisor::register_hypervisor_plugin, hypervisor::TopologyConfigInfo,
hypervisor::HYPERVISOR_NAME_CH, hypervisor::HYPERVISOR_NAME_QEMU, CloudHypervisorConfig,
QemuConfig,
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
// Clh specific configuration path
const CLH_CONFIG_PATH: &str =
"/opt/kata/share/defaults/kata-containers/runtime-rs/configuration-cloud-hypervisor.toml";
// qemu specific configuration path
const QEMU_CONFIG_PATH: &str =
"/opt/kata/share/defaults/kata-containers/runtime-rs/configuration-qemu-runtime-rs.toml";
const VM_NAME: &str = "agent-ctl-testvm";
const VM_START_TIMEOUT: i32 = 10_000;
// Boot the test vm.
// In summary, this method
// - parses hypervisor specific kata config file
// - loads hypervisor specific config
// - instantiates a hypervisor object
// - calls prepare_vm
// - instantiates device manager to handle devices
// - calls start_vm to boot pod vm
// - retrieves the agent ttrpc server socket address
pub(crate) async fn boot_vm(name: &str) -> Result<TestVm> {
let config_path;
let mut is_hybrid_vsock = false;
// Register the hypervisor config plugin
match name {
HYPERVISOR_NAME_CH => {
register_hypervisor_plugin(HYPERVISOR_NAME_CH, Arc::new(CloudHypervisorConfig::new()));
config_path = CLH_CONFIG_PATH;
is_hybrid_vsock = true;
}
&_ => {
register_hypervisor_plugin(HYPERVISOR_NAME_QEMU, Arc::new(QemuConfig::new()));
config_path = QEMU_CONFIG_PATH;
}
};
// get the kata configuration toml
let toml_config = vm_utils::load_config(config_path)?;
let hypervisor_config = toml_config
.hypervisor
.get(name)
.ok_or_else(|| anyhow!("Failed to get hypervisor config"))
.context("get hypervisor config")?;
let hypervisor: Arc<dyn Hypervisor> = match name {
HYPERVISOR_NAME_CH => {
let hyp_ch = Arc::new(CloudHypervisor::new());
hyp_ch
.set_hypervisor_config(hypervisor_config.clone())
.await;
hyp_ch
}
&_ => {
let hyp_qemu = Arc::new(Qemu::new());
hyp_qemu
.set_hypervisor_config(hypervisor_config.clone())
.await;
hyp_qemu
}
};
// prepare vm
// we do not pass any network namesapce since we dont want any
let empty_anno_map: HashMap<String, String> = HashMap::new();
hypervisor
.prepare_vm(VM_NAME, None, &empty_anno_map)
.await
.context(" prepare test vm")?;
// instantiate device manager
let topo_config = TopologyConfigInfo::new(&toml_config);
let dev_manager = Arc::new(RwLock::new(
DeviceManager::new(hypervisor.clone(), topo_config.as_ref())
.await
.context("failed to create device manager")?,
));
// For qemu, we need some additional device handling
// - vsock device
// - block device for rootfs if using image
if name.contains(HYPERVISOR_NAME_QEMU) {
add_vsock_device(dev_manager.clone())
.await
.context("qemu::adding vsock device")?;
if !hypervisor_config.boot_info.image.is_empty() {
let blk_config = BlockConfig {
path_on_host: hypervisor_config.boot_info.image.clone(),
is_readonly: true,
driver_option: hypervisor_config.boot_info.vm_rootfs_driver.clone(),
..Default::default()
};
add_block_device(dev_manager.clone(), blk_config)
.await
.context("qemu: handle rootfs")?;
}
}
// start vm
hypervisor
.start_vm(VM_START_TIMEOUT)
.await
.context("start pod vm")?;
let agent_socket_addr = hypervisor
.get_agent_socket()
.await
.context("get agent socket path")?;
// return the vm structure
Ok(TestVm {
hypervisor_name: name.to_string(),
hypervisor_instance: hypervisor,
socket_addr: agent_socket_addr,
hybrid_vsock: is_hybrid_vsock,
})
}
pub(crate) async fn stop_vm(instance: Arc<dyn Hypervisor>) -> Result<()> {
instance.stop_vm().await.context("stopping pod vm")
}
async fn add_block_device(dev_mgr: Arc<RwLock<DeviceManager>>, cfg: BlockConfig) -> Result<()> {
do_handle_device(&dev_mgr, &DeviceConfig::BlockCfg(cfg))
.await
.context("handle block device failed")?;
Ok(())
}
async fn add_vsock_device(dev_mgr: Arc<RwLock<DeviceManager>>) -> Result<()> {
let vsock_config = VsockConfig {
guest_cid: libc::VMADDR_CID_ANY,
};
do_handle_device(&dev_mgr, &DeviceConfig::VsockCfg(vsock_config))
.await
.context("handle vsock device failed")?;
Ok(())
}

View File

@ -0,0 +1,53 @@
// Copyright (c) 2025 Microsoft Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
// Description: Boot UVM for testing container storages/volumes.
use anyhow::{anyhow, Context, Result};
use kata_types::config::TomlConfig;
use slog::info;
// Helper function to parse a configuration file.
pub fn load_config(config_file: &str) -> Result<TomlConfig> {
info!(sl!(), "Load kata configuration file {}", config_file);
let (mut toml_config, _) = TomlConfig::load_from_file(config_file)
.context("Failed to load kata configuration file")?;
// Update the agent kernel params in hypervisor config
update_agent_kernel_params(&mut toml_config)?;
// validate configuration and return the error
toml_config.validate()?;
info!(sl!(), "parsed config content {:?}", &toml_config);
Ok(toml_config)
}
pub fn to_kernel_string(key: String, val: String) -> Result<String> {
if key.is_empty() && val.is_empty() {
Err(anyhow!("Empty key and value"))
} else if key.is_empty() {
Err(anyhow!("Empty key"))
} else if val.is_empty() {
Ok(key.to_string())
} else {
Ok(format!("{}{}{}", key, "=", val))
}
}
fn update_agent_kernel_params(config: &mut TomlConfig) -> Result<()> {
let mut params = vec![];
if let Ok(kv) = config.get_agent_kernel_params() {
for (k, v) in kv.into_iter() {
if let Ok(s) = to_kernel_string(k.to_owned(), v.to_owned()) {
params.push(s);
}
}
if let Some(h) = config.hypervisor.get_mut(&config.runtime.hypervisor_name) {
h.boot_info.add_kernel_params(params);
}
}
Ok(())
}

View File

@ -0,0 +1,34 @@
#!/usr/bin/env bats
# Copyright (c) 2024 Microsoft Corporation
#
# SPDX-License-Identifier: Apache-2.0
load "${BATS_TEST_DIRNAME}/../../../common.bash"
load "${BATS_TEST_DIRNAME}/../setup_common.sh"
setup_file() {
info "setup"
sudo rm qmp.sock console.sock || echo "No existing qmp.sock/console.sock"
}
@test "Test GetGuestDetails: Boot qemu pod vm and run GetGuestDetails" {
info "Boot qemu vm, establish connection with agent inside the vm and send GetGuestDetails command"
local cmds=()
cmds+=("--vm qemu -c GetGuestDetails")
run_agent_ctl "${cmds[@]}"
sudo rm qmp.sock console.sock
}
@test "Test GetGuestDetails: Boot cloud hypervisor pod vm and run GetGuestDetails" {
info "Boot cloud hypervisor vm, establish connection with agent inside the vm and send GetGuestDetails command"
local cmds=()
cmds+=("--vm cloud-hypervisor -c GetGuestDetails")
run_agent_ctl "${cmds[@]}"
}
teardown_file() {
info "teardown"
sudo rm -r /run/kata/agent-ctl-testvm || echo "Failed to clean /run/kata/agent-ctl-testvm"
sudo rm -r /run/kata-containers/ || echo "Failed to clean /run/kata-containers"
}