diff --git a/src/runtime-rs/crates/hypervisor/ch-config/src/ch_api.rs b/src/runtime-rs/crates/hypervisor/ch-config/src/ch_api.rs index b68dda4604..1bc0ed2b0f 100644 --- a/src/runtime-rs/crates/hypervisor/ch-config/src/ch_api.rs +++ b/src/runtime-rs/crates/hypervisor/ch-config/src/ch_api.rs @@ -12,66 +12,80 @@ use api_client::{ use serde::{Deserialize, Serialize}; use std::os::{fd::RawFd, unix::net::UnixStream}; +use tokio::sync::Mutex; use tokio::task; -pub async fn cloud_hypervisor_vmm_ping(mut socket: UnixStream) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response(&mut socket, "GET", "vmm.ping", None) - .map_err(|e| anyhow!(e))?; +/// Type alias for the serialized API socket shared across callers. +/// +/// All Cloud Hypervisor HTTP API calls share a single `UnixStream`. Because +/// the CH API uses HTTP/1.1 over a Unix domain socket without pipelining, +/// concurrent requests on the same stream corrupt the response framing. +/// Wrapping the socket in a `Mutex` ensures only one request-response cycle +/// is in flight at a time. +pub type ApiSocket = Mutex>; +/// Execute a CH API command while holding the API socket lock. +/// +/// Acquires the mutex, clones the socket, and runs the blocking HTTP +/// request-response in `spawn_blocking`. The mutex guard is held until +/// the blocking task completes, ensuring no concurrent API calls. +async fn api_command( + api_socket: &ApiSocket, + method: &'static str, + endpoint: &'static str, + body: Option, + fds: Option>, +) -> Result> { + let _guard = api_socket.lock().await; + let mut socket = _guard + .as_ref() + .ok_or_else(|| anyhow!("missing api socket"))? + .try_clone() + .context("clone api socket")?; + + let result = task::spawn_blocking(move || -> Result> { + let response = if let Some(fds) = fds { + simple_api_full_command_with_fds_and_response( + &mut socket, + method, + endpoint, + body.as_deref(), + &fds, + ) + } else { + simple_api_full_command_and_response(&mut socket, method, endpoint, body.as_deref()) + } + .map_err(|e| anyhow!(e))?; Ok(response) }) - .await? + .await?; + + result } -pub async fn cloud_hypervisor_vmm_shutdown(mut socket: UnixStream) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = - simple_api_full_command_and_response(&mut socket, "PUT", "vmm.shutdown", None) - .map_err(|e| anyhow!(e))?; +pub async fn cloud_hypervisor_vmm_ping(api_socket: &ApiSocket) -> Result> { + api_command(api_socket, "GET", "vmm.ping", None, None).await +} - Ok(response) - }) - .await? +pub async fn cloud_hypervisor_vmm_shutdown(api_socket: &ApiSocket) -> Result> { + api_command(api_socket, "PUT", "vmm.shutdown", None, None).await } pub async fn cloud_hypervisor_vm_create( - mut socket: UnixStream, + api_socket: &ApiSocket, cfg: VmConfig, ) -> Result> { - let serialised = serde_json::to_string_pretty(&cfg)?; - - task::spawn_blocking(move || -> Result> { - let data = Some(serialised.as_str()); - - let response = simple_api_full_command_and_response(&mut socket, "PUT", "vm.create", data) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string_pretty(&cfg)?; + api_command(api_socket, "PUT", "vm.create", Some(body), None).await } -pub async fn cloud_hypervisor_vm_start(mut socket: UnixStream) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response(&mut socket, "PUT", "vm.boot", None) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? +pub async fn cloud_hypervisor_vm_start(api_socket: &ApiSocket) -> Result> { + api_command(api_socket, "PUT", "vm.boot", None, None).await } #[allow(dead_code)] -pub async fn cloud_hypervisor_vm_stop(mut socket: UnixStream) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = - simple_api_full_command_and_response(&mut socket, "PUT", "vm.shutdown", None) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? +pub async fn cloud_hypervisor_vm_stop(api_socket: &ApiSocket) -> Result> { + api_command(api_socket, "PUT", "vm.shutdown", None, None).await } #[derive(Deserialize, Debug)] @@ -87,163 +101,73 @@ pub struct VmRemoveDeviceData { } pub async fn cloud_hypervisor_vm_blockdev_add( - mut socket: UnixStream, + api_socket: &ApiSocket, blk_config: DiskConfig, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.add-disk", - Some(&serde_json::to_string(&blk_config)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&blk_config)?; + api_command(api_socket, "PUT", "vm.add-disk", Some(body), None).await } pub async fn cloud_hypervisor_vm_netdev_add( - mut socket: UnixStream, + api_socket: &ApiSocket, net_config: NetConfig, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.add-net", - Some(&serde_json::to_string(&net_config)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&net_config)?; + api_command(api_socket, "PUT", "vm.add-net", Some(body), None).await } pub async fn cloud_hypervisor_vm_netdev_add_with_fds( - mut socket: UnixStream, + api_socket: &ApiSocket, net_config: NetConfig, request_fds: Vec, ) -> Result> { - let serialised = serde_json::to_string(&net_config)?; - - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_with_fds_and_response( - &mut socket, - "PUT", - "vm.add-net", - Some(&serialised), - &request_fds, - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&net_config)?; + api_command(api_socket, "PUT", "vm.add-net", Some(body), Some(request_fds)).await } pub async fn cloud_hypervisor_vm_device_add( - mut socket: UnixStream, + api_socket: &ApiSocket, device_config: DeviceConfig, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.add-device", - Some(&serde_json::to_string(&device_config)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&device_config)?; + api_command(api_socket, "PUT", "vm.add-device", Some(body), None).await } #[allow(dead_code)] pub async fn cloud_hypervisor_vm_device_remove( - mut socket: UnixStream, + api_socket: &ApiSocket, device_data: VmRemoveDeviceData, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.remove-device", - Some(&serde_json::to_string(&device_data)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&device_data)?; + api_command(api_socket, "PUT", "vm.remove-device", Some(body), None).await } pub async fn cloud_hypervisor_vm_fs_add( - mut socket: UnixStream, + api_socket: &ApiSocket, fs_config: FsConfig, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.add-fs", - Some(&serde_json::to_string(&fs_config)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&fs_config)?; + api_command(api_socket, "PUT", "vm.add-fs", Some(body), None).await } pub async fn cloud_hypervisor_vm_vsock_add( - mut socket: UnixStream, + api_socket: &ApiSocket, vsock_config: VsockConfig, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.add-vsock", - Some(&serde_json::to_string(&vsock_config)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&vsock_config)?; + api_command(api_socket, "PUT", "vm.add-vsock", Some(body), None).await } -pub async fn cloud_hypervisor_vm_info(mut socket: UnixStream) -> Result { - let vm_info = task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response(&mut socket, "GET", "vm.info", None) - .map_err(|e| anyhow!(format!("failed to run get vminfo with err: {:?}", e)))?; - - Ok(response) - }) - .await??; - - let vm_info = vm_info.ok_or(anyhow!("failed to get vminfo"))?; +pub async fn cloud_hypervisor_vm_info(api_socket: &ApiSocket) -> Result { + let response = api_command(api_socket, "GET", "vm.info", None, None).await?; + let vm_info = response.ok_or(anyhow!("failed to get vminfo"))?; serde_json::from_str(&vm_info).with_context(|| format!("failed to serde {vm_info}")) } pub async fn cloud_hypervisor_vm_resize( - mut socket: UnixStream, + api_socket: &ApiSocket, vmresize: VmResize, ) -> Result> { - task::spawn_blocking(move || -> Result> { - let response = simple_api_full_command_and_response( - &mut socket, - "PUT", - "vm.resize", - Some(&serde_json::to_string(&vmresize)?), - ) - .map_err(|e| anyhow!(e))?; - - Ok(response) - }) - .await? + let body = serde_json::to_string(&vmresize)?; + api_command(api_socket, "PUT", "vm.resize", Some(body), None).await } diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs index 3b9ed8a002..f4f0ee1a91 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner.rs @@ -13,8 +13,8 @@ use kata_types::capabilities::{Capabilities, CapabilityBits}; use kata_types::config::hypervisor::Hypervisor as HypervisorConfig; use kata_types::config::hypervisor::HYPERVISOR_NAME_CH; use persist::sandbox_persist::Persist; +use ch_config::ch_api::ApiSocket; use std::collections::HashMap; -use std::os::unix::net::UnixStream; use tokio::sync::watch::{channel, Receiver, Sender}; use tokio::task::JoinHandle; use tokio::{process::Child, sync::mpsc}; @@ -24,7 +24,7 @@ pub struct CloudHypervisorInner { pub(crate) state: VmmState, pub(crate) id: String, - pub(crate) api_socket: Option, + pub(crate) api_socket: ApiSocket, pub(crate) extra_args: Option>, pub(crate) config: HypervisorConfig, @@ -95,7 +95,7 @@ impl CloudHypervisorInner { let (tx, rx) = channel(true); Self { - api_socket: None, + api_socket: ApiSocket::new(None), extra_args: None, process: None, diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner_device.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner_device.rs index 816b18bf7d..c0c621d886 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner_device.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner_device.rs @@ -143,12 +143,6 @@ impl CloudHypervisorInner { )); } - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - let num_queues: usize = if device.config.queue_num > 0 { device.config.queue_num as usize } else { @@ -177,11 +171,7 @@ impl CloudHypervisorInner { ..Default::default() }; - let response = cloud_hypervisor_vm_fs_add( - socket.try_clone().context("failed to clone socket")?, - fs_config, - ) - .await?; + let response = cloud_hypervisor_vm_fs_add(&self.api_socket, fs_config).await?; if let Some(detail) = response { debug!(sl!(), "fs add response: {:?}", detail); @@ -206,23 +196,13 @@ impl CloudHypervisorInner { let sysfsdev = primary_device.sysfs_path.clone(); - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - let device_config = DeviceConfig { path: PathBuf::from(sysfsdev), iommu: false, ..Default::default() }; - let response = cloud_hypervisor_vm_device_add( - socket.try_clone().context("failed to clone socket")?, - device_config, - ) - .await?; + let response = cloud_hypervisor_vm_device_add(&self.api_socket, device_config).await?; if let Some(detail) = response { debug!(sl!(), "VFIO add response: {:?}", detail); @@ -252,22 +232,12 @@ impl CloudHypervisorInner { )); } - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - let clh_device_id = clh_device_id.unwrap(); let rm_data = VmRemoveDeviceData { id: clh_device_id.clone(), }; - let response = cloud_hypervisor_vm_device_remove( - socket.try_clone().context("failed to clone socket")?, - rm_data, - ) - .await?; + let response = cloud_hypervisor_vm_device_remove(&self.api_socket, rm_data).await?; if let Some(detail) = response { debug!(sl!(), "device remove response: {:?}", detail); @@ -301,11 +271,6 @@ impl CloudHypervisorInner { async fn handle_hvsock_device(&mut self, device: HybridVsockDevice) -> Result { let hvsock_config = device.config.clone(); - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; let vsock_config = VsockConfig { cid: hvsock_config.guest_cid.into(), @@ -313,11 +278,7 @@ impl CloudHypervisorInner { ..Default::default() }; - let response = cloud_hypervisor_vm_vsock_add( - socket.try_clone().context("failed to clone socket")?, - vsock_config, - ) - .await?; + let response = cloud_hypervisor_vm_vsock_add(&self.api_socket, vsock_config).await?; if let Some(detail) = response { debug!(sl!(), "hvsock add response: {:?}", detail); @@ -328,11 +289,6 @@ impl CloudHypervisorInner { async fn handle_block_device(&mut self, device: BlockDevice) -> Result { let mut block_dev = device.clone(); - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; let mut disk_config = DiskConfig::try_from(device.config.clone())?; disk_config.direct = device @@ -352,11 +308,7 @@ impl CloudHypervisorInner { ); disk_config.rate_limiter_config = block_rate_limit; - let response = cloud_hypervisor_vm_blockdev_add( - socket.try_clone().context("failed to clone socket")?, - disk_config, - ) - .await?; + let response = cloud_hypervisor_vm_blockdev_add(&self.api_socket, disk_config).await?; if let Some(detail) = response { debug!(sl!(), "blockdev add response: {:?}", detail); @@ -373,12 +325,6 @@ impl CloudHypervisorInner { async fn handle_network_device(&mut self, device: NetworkDevice) -> Result { let netdev = device.clone(); - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - let mut clh_net_config = NetConfig::try_from(device.config)?; // When using fds to pass the tap device to cloud-hypervisor, tap and id fields should be None clh_net_config.tap = None; @@ -389,12 +335,8 @@ impl CloudHypervisorInner { let fds = files.iter().map(|f| f.as_raw_fd()).collect(); - let response = cloud_hypervisor_vm_netdev_add_with_fds( - socket.try_clone().context("failed to clone socket")?, - clh_net_config, - fds, - ) - .await?; + let response = + cloud_hypervisor_vm_netdev_add_with_fds(&self.api_socket, clh_net_config, fds).await?; if let Some(detail) = response { debug!(sl!(), "netdev add response: {:?}", detail); diff --git a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs index 7a3a4bb013..329f8760cb 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/inner_hypervisor.rs @@ -190,12 +190,6 @@ impl CloudHypervisorInner { let (shared_fs_devices, network_devices, host_devices, protection_device) = self.get_shared_devices().await?; - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - let sandbox_path = get_sandbox_path(&self.id); create_dir_all_with_inherit_owner(sandbox_path.clone(), 0o750) @@ -232,9 +226,7 @@ impl CloudHypervisorInner { "CH specific VmConfig configuration (JSON): {:?}", serialised ); - let response = - cloud_hypervisor_vm_create(socket.try_clone().context("failed to clone socket")?, cfg) - .await?; + let response = cloud_hypervisor_vm_create(&self.api_socket, cfg).await?; if let Some(detail) = response { debug!(sl!(), "vm boot response: {:?}", detail); @@ -243,13 +235,10 @@ impl CloudHypervisorInner { if let Some(network_devices) = network_devices { for net in network_devices { let vm_fds = net.fds.clone().unwrap_or_default(); - let response = cloud_hypervisor_vm_netdev_add_with_fds( - socket.try_clone().context("failed to clone socket")?, - net, - vm_fds.clone(), - ) - .await - .context("failed to add vm netdev with fds")?; + let response = + cloud_hypervisor_vm_netdev_add_with_fds(&self.api_socket, net, vm_fds.clone()) + .await + .context("failed to add vm netdev with fds")?; if let Some(detail) = response { debug!(sl!(), "vm netdev add response: {:?}", detail); @@ -262,9 +251,7 @@ impl CloudHypervisorInner { } } - let response = - cloud_hypervisor_vm_start(socket.try_clone().context("failed to clone socket")?) - .await?; + let response = cloud_hypervisor_vm_start(&self.api_socket).await?; if let Some(detail) = response { debug!(sl!(), "vm start response: {:?}", detail); @@ -311,7 +298,7 @@ impl CloudHypervisorInner { let api_socket = result?; - self.api_socket = Some(api_socket); + *self.api_socket.lock().await = Some(api_socket); Ok(()) } @@ -471,14 +458,9 @@ impl CloudHypervisorInner { } async fn cloud_hypervisor_shutdown(&mut self) -> Result<()> { - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - - let response = - cloud_hypervisor_vmm_shutdown(socket.try_clone().context("shutdown failed")?).await?; + let response = cloud_hypervisor_vmm_shutdown(&self.api_socket) + .await + .context("shutdown failed")?; if let Some(detail) = response { debug!(sl!(), "shutdown response: {:?}", detail); @@ -564,17 +546,10 @@ impl CloudHypervisorInner { } async fn cloud_hypervisor_ping_until_ready(&mut self, _poll_time_ms: u64) -> Result<()> { - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - loop { - let response = - cloud_hypervisor_vmm_ping(socket.try_clone().context("failed to clone socket")?) - .await - .context("ping failed"); + let response = cloud_hypervisor_vmm_ping(&self.api_socket) + .await + .context("ping failed"); if let Ok(response) = response { if let Some(detail) = response { @@ -782,23 +757,14 @@ impl CloudHypervisorInner { return Ok((old_vcpus, new_vcpus)); } - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - let vmresize = VmResize { desired_vcpus: Some(new_vcpus as u8), ..Default::default() }; - cloud_hypervisor_vm_resize( - socket.try_clone().context("failed to clone socket")?, - vmresize, - ) - .await - .context("resize vcpus")?; + cloud_hypervisor_vm_resize(&self.api_socket, vmresize) + .await + .context("resize vcpus")?; Ok((old_vcpus, new_vcpus)) } @@ -877,16 +843,9 @@ impl CloudHypervisorInner { } pub(crate) async fn resize_memory(&self, new_mem_mb: u32) -> Result<(u32, MemoryConfig)> { - let socket = self - .api_socket - .as_ref() - .ok_or("missing socket") - .map_err(|e| anyhow!(e))?; - - let vminfo = - cloud_hypervisor_vm_info(socket.try_clone().context("failed to clone socket")?) - .await - .context("get vminfo")?; + let vminfo = cloud_hypervisor_vm_info(&self.api_socket) + .await + .context("get vminfo")?; let current_mem_size = vminfo.config.memory.size; let new_total_mem = megs_to_bytes(new_mem_mb); @@ -954,12 +913,9 @@ impl CloudHypervisorInner { ..Default::default() }; - cloud_hypervisor_vm_resize( - socket.try_clone().context("failed to clone socket")?, - vmresize, - ) - .await - .context("resize memory")?; + cloud_hypervisor_vm_resize(&self.api_socket, vmresize) + .await + .context("resize memory")?; Ok((new_mem_mb, MemoryConfig::default())) }