diff --git a/Cargo.lock b/Cargo.lock index ed57a852a0..7d8a405b5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6308,6 +6308,8 @@ dependencies = [ "futures", "futures-lite 2.6.1", "hex", + "hyper 0.14.32", + "hyperlocal", "hypervisor", "inotify 0.11.1", "kata-sys-util", diff --git a/src/runtime-rs/crates/resource/Cargo.toml b/src/runtime-rs/crates/resource/Cargo.toml index e7e4407daf..29dfa609fc 100644 --- a/src/runtime-rs/crates/resource/Cargo.toml +++ b/src/runtime-rs/crates/resource/Cargo.toml @@ -39,6 +39,8 @@ flate2 = "1.1" tempfile = "3.19.1" hex = "0.4" base64 = { workspace = true } +hyper = { workspace = true, features = ["client", "http1"] } +hyperlocal = { workspace = true } ## Dependencies from `rust-netlink` ## 0.30+ parses IFLA_INET6_CONF on kernels 6.17+ (240-byte blob; DEVCONF_FORCE_FORWARDING). diff --git a/src/runtime-rs/crates/resource/src/share_fs/mod.rs b/src/runtime-rs/crates/resource/src/share_fs/mod.rs index 98e81839b0..84caf5c233 100644 --- a/src/runtime-rs/crates/resource/src/share_fs/mod.rs +++ b/src/runtime-rs/crates/resource/src/share_fs/mod.rs @@ -3,6 +3,9 @@ // // SPDX-License-Identifier: Apache-2.0 // +// + +mod nydus; mod share_virtio_fs; pub use share_virtio_fs::rafs_mount; diff --git a/src/runtime-rs/crates/resource/src/share_fs/nydus/mod.rs b/src/runtime-rs/crates/resource/src/share_fs/nydus/mod.rs new file mode 100644 index 0000000000..936ec5d48e --- /dev/null +++ b/src/runtime-rs/crates/resource/src/share_fs/nydus/mod.rs @@ -0,0 +1,45 @@ +// Copyright (c) 2026 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// +// + +pub mod nydus_client; + +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MountRequest { + pub fs_type: String, + pub source: PathBuf, + pub config: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub overlay: Option, +} + +#[allow(dead_code)] +impl MountRequest { + pub fn new(fs_type: &str, source: &Path, config: &str) -> Self { + Self { + fs_type: fs_type.to_string(), + source: source.to_path_buf(), + config: config.to_string(), + overlay: None, + } + } + + pub fn new_with_overlay( + fs_type: &str, + source: &Path, + config: &str, + overlay_config: &str, + ) -> Self { + Self { + fs_type: fs_type.to_string(), + source: source.to_path_buf(), + config: config.to_string(), + overlay: Some(overlay_config.to_string()), + } + } +} diff --git a/src/runtime-rs/crates/resource/src/share_fs/nydus/nydus_client.rs b/src/runtime-rs/crates/resource/src/share_fs/nydus/nydus_client.rs new file mode 100644 index 0000000000..1a519d611b --- /dev/null +++ b/src/runtime-rs/crates/resource/src/share_fs/nydus/nydus_client.rs @@ -0,0 +1,216 @@ +// Copyright (c) 2026 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use hyper::{body::to_bytes, Body, Client, Method, Request, StatusCode}; +use hyperlocal::{UnixClientExt, Uri}; +use serde::{Deserialize, Serialize}; +use tokio::time::{timeout, Duration}; + +use crate::share_fs::nydus::MountRequest; + +const HTTP_CLIENT_TIMEOUT_SECS: u64 = 30; +// Keep the per-probe timeout short relative to the total readiness timeout so a +// single slow/hung probe cannot consume the whole budget and starve the retry +// loop (which would make `max_attempts` largely ineffective). +const HTTP_READY_CHECK_TIMEOUT_SECS: u64 = 1; +const HTTP_READY_TOTAL_TIMEOUT_SECS: u64 = 10; + +const INFO_ENDPOINT: &str = "/api/v1/daemon"; +const MOUNT_ENDPOINT: &str = "/api/v1/mount"; + +const NYDUSD_DAEMON_STATE_RUNNING: &str = "RUNNING"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BuildTimeInfo { + pub package_ver: String, + pub git_commit: String, + pub build_time: String, + pub profile: String, + pub rustc: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaemonInfo { + pub version: BuildTimeInfo, + #[serde(default)] + pub id: Option, + #[serde(default)] + pub supervisor: Option, + pub state: String, + #[serde(default)] + pub backend_collection: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorMessage { + pub code: String, + pub message: String, +} + +pub struct NydusClient { + sock_path: PathBuf, + client: Client, +} + +#[allow(dead_code)] +impl NydusClient { + pub fn new(sock_path: &Path) -> Self { + Self { + sock_path: sock_path.to_path_buf(), + client: Client::unix(), + } + } + + async fn send_request( + &self, + method: Method, + path: &str, + body: Option<&str>, + ) -> Result<(StatusCode, Vec)> { + self.send_request_with_timeout(method, path, body, HTTP_CLIENT_TIMEOUT_SECS) + .await + } + + async fn send_request_with_timeout( + &self, + method: Method, + path: &str, + body: Option<&str>, + timeout_secs: u64, + ) -> Result<(StatusCode, Vec)> { + let uri: hyper::Uri = Uri::new(&self.sock_path, path).into(); + + let request_builder = Request::builder() + .method(method) + .uri(uri) + .header("Content-Type", "application/json"); + + let req = match body { + Some(b) => request_builder + .body(Body::from(b.to_string())) + .context("failed to build HTTP request with body")?, + None => request_builder + .body(Body::empty()) + .context("failed to build HTTP request")?, + }; + + let response = timeout(Duration::from_secs(timeout_secs), self.client.request(req)) + .await + .context("timeout waiting for response")? + .context("failed to send HTTP request")?; + + let status = response.status(); + let body_bytes = to_bytes(response.into_body()) + .await + .context("failed to read response body")?; + + Ok((status, body_bytes.to_vec())) + } + + pub async fn check_status(&self) -> Result { + let (status, body) = self.send_request(Method::GET, INFO_ENDPOINT, None).await?; + + if status != StatusCode::OK { + return Err(anyhow!("nydusd check status failed with code {}", status)); + } + + let info: DaemonInfo = + serde_json::from_slice(&body).context("failed to parse DaemonInfo")?; + Ok(info) + } + + pub async fn mount(&self, mountpoint: &str, req: &MountRequest) -> Result<()> { + let path = format!( + "{}?mountpoint={}", + MOUNT_ENDPOINT, + percent_encode_query_value(mountpoint) + ); + let body = serde_json::to_string(req).context("failed to serialize MountRequest")?; + let (status, resp_body) = self.send_request(Method::POST, &path, Some(&body)).await?; + + if status == StatusCode::NO_CONTENT { + return Ok(()); + } + + let err: ErrorMessage = + serde_json::from_slice(&resp_body).context("failed to parse error message")?; + Err(anyhow!("nydusd mount failed: {}", err.message)) + } + + pub async fn umount(&self, mountpoint: &str) -> Result<()> { + let path = format!( + "{}?mountpoint={}", + MOUNT_ENDPOINT, + percent_encode_query_value(mountpoint) + ); + let (status, resp_body) = self.send_request(Method::DELETE, &path, None).await?; + + if status == StatusCode::NO_CONTENT { + return Ok(()); + } + + let err: ErrorMessage = + serde_json::from_slice(&resp_body).context("failed to parse error message")?; + Err(anyhow!("nydusd umount failed: {}", err.message)) + } + + pub async fn wait_until_ready(&self, max_attempts: u32, delay_ms: u64) -> Result<()> { + timeout(Duration::from_secs(HTTP_READY_TOTAL_TIMEOUT_SECS), async { + for _ in 0..max_attempts { + match self + .check_status_with_timeout(HTTP_READY_CHECK_TIMEOUT_SECS) + .await + { + Ok(info) if info.state == NYDUSD_DAEMON_STATE_RUNNING => { + return Ok(()); + } + Ok(info) => { + debug!(sl!(), "nydusd state: {}, waiting...", info.state); + } + Err(e) => { + debug!(sl!(), "nydusd not ready: {}", e); + } + } + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + + Err(anyhow!( + "nydusd API server not ready after {} attempts", + max_attempts + )) + }) + .await + .context("timeout waiting for nydusd API server ready")? + } + + async fn check_status_with_timeout(&self, timeout_secs: u64) -> Result { + let (status, body) = self + .send_request_with_timeout(Method::GET, INFO_ENDPOINT, None, timeout_secs) + .await?; + + if status != StatusCode::OK { + return Err(anyhow!("nydusd check status failed with code {}", status)); + } + + let info: DaemonInfo = + serde_json::from_slice(&body).context("failed to parse DaemonInfo")?; + Ok(info) + } +} + +fn percent_encode_query_value(value: &str) -> String { + value + .bytes() + .flat_map(|byte| match byte { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => { + vec![byte as char] + } + _ => format!("%{byte:02X}").chars().collect(), + }) + .collect() +}