Merge pull request #5467 from tzY15368/feat-katactl-direct-vol

Feat: implementation of kata-ctl direct-volume operations
This commit is contained in:
Zhongtao Hu
2023-01-05 14:06:18 +08:00
committed by GitHub
12 changed files with 474 additions and 1887 deletions

View File

@@ -5,7 +5,7 @@
//
use anyhow::{anyhow, Context, Result};
use std::path::PathBuf;
use std::{collections::HashMap, path::PathBuf};
/// Prefix to mark a volume as Kata special.
pub const KATA_VOLUME_TYPE_PREFIX: &str = "kata:";
@@ -19,6 +19,12 @@ pub const KATA_EPHEMERAL_VOLUME_TYPE: &str = "ephemeral";
/// KATA_HOST_DIR_TYPE use for host empty dir
pub const KATA_HOST_DIR_VOLUME_TYPE: &str = "kata:hostdir";
/// KATA_MOUNT_INFO_FILE_NAME is used for the file that holds direct-volume mount info
pub const KATA_MOUNT_INFO_FILE_NAME: &str = "mountInfo.json";
/// KATA_DIRECT_VOLUME_ROOT_PATH is the root path used for concatenating with the direct-volume mount info file path
pub const KATA_DIRECT_VOLUME_ROOT_PATH: &str = "/run/kata-containers/shared/direct-volumes";
/// Information about a mount.
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct Mount {
@@ -49,6 +55,22 @@ impl Mount {
}
}
/// DirectVolumeMountInfo contains the information needed by Kata
/// to consume a host block device and mount it as a filesystem inside the guest VM.
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct DirectVolumeMountInfo {
/// The type of the volume (ie. block)
pub volume_type: String,
/// The device backing the volume.
pub device: String,
/// The filesystem type to be mounted on the volume.
pub fs_type: String,
/// Additional metadata to pass to the agent regarding this volume.
pub metadata: HashMap<String, String>,
/// Additional mount options.
pub options: Vec<String>,
}
/// Check whether a mount type is a marker for Kata specific volume.
pub fn is_kata_special_volume(ty: &str) -> bool {
ty.len() > KATA_VOLUME_TYPE_PREFIX.len() && ty.starts_with(KATA_VOLUME_TYPE_PREFIX)

View File

@@ -35,7 +35,7 @@ impl MgmtClient {
let unix_socket_path = mgmt_socket_addr(sid).context("Failed to get unix socket path")?;
let s_addr = unix_socket_path
.strip_prefix("unix:")
.context("failed to strix prefix")?;
.context("failed to strip prefix")?;
let sock_path = Path::new("/").join(s_addr).as_path().to_owned();
let client = Client::unix();
Ok(Self {
@@ -49,32 +49,52 @@ impl MgmtClient {
/// Parameter uri should be like "/agent-url" etc.
pub async fn get(&self, uri: &str) -> Result<Response<Body>> {
let url: hyper::Uri = Uri::new(&self.sock_path, uri).into();
let work = self.client.get(url);
match self.timeout {
Some(timeout) => match tokio::time::timeout(timeout, work).await {
Ok(result) => result.map_err(|e| anyhow!(e)),
Err(_) => Err(anyhow!("TIMEOUT")),
},
// if timeout not set, work executes directly
None => work.await.context("failed to GET"),
}
let req = Request::builder()
.method(Method::GET)
.uri(url)
.body(Body::empty())?;
return self.send_request(req).await;
}
/// The HTTP Post method for client
pub async fn post(
&self,
uri: &str,
content_type: &str,
content: &str,
) -> Result<Response<Body>> {
let url: hyper::Uri = Uri::new(&self.sock_path, uri).into();
// build body from content
let body = Body::from(content.to_string());
let req = Request::builder()
.method(Method::POST)
.uri(url)
.header("content-type", content_type)
.body(body)?;
return self.send_request(req).await;
}
/// The http PUT method for client
pub async fn put(&self, uri: &str, data: Vec<u8>) -> Result<Response<Body>> {
let url: hyper::Uri = Uri::new(&self.sock_path, uri).into();
let request = Request::builder()
let req = Request::builder()
.method(Method::PUT)
.uri(url)
.body(Body::from(data))
.unwrap();
let work = self.client.request(request);
.body(Body::from(data))?;
return self.send_request(req).await;
}
async fn send_request(&self, req: Request<Body>) -> Result<Response<Body>> {
let msg = format!("Request ({:?}) to uri {:?}", req.method(), req.uri());
let resp = self.client.request(req);
match self.timeout {
Some(timeout) => match tokio::time::timeout(timeout, work).await {
Some(timeout) => match tokio::time::timeout(timeout, resp).await {
Ok(result) => result.map_err(|e| anyhow!(e)),
Err(_) => Err(anyhow!("TIMEOUT")),
Err(_) => Err(anyhow!("{:?} timeout after {:?}", msg, self.timeout)),
},
None => work.await.context("failed to PUT"),
// if client timeout is not set, request waits with no deadline
None => resp.await.context(format!("{:?} failed", msg)),
}
}
}

View File

@@ -20,8 +20,8 @@ pub use types::{
GetIPTablesResponse, GuestDetailsResponse, HealthCheckResponse, IPAddress, IPFamily, Interface,
Interfaces, ListProcessesRequest, MemHotplugByProbeRequest, OnlineCPUMemRequest,
OomEventResponse, ReadStreamRequest, ReadStreamResponse, RemoveContainerRequest,
ReseedRandomDevRequest, Route, Routes, SetGuestDateTimeRequest, SetIPTablesRequest,
SetIPTablesResponse, SignalProcessRequest, StatsContainerResponse, Storage,
ReseedRandomDevRequest, ResizeVolumeRequest, Route, Routes, SetGuestDateTimeRequest,
SetIPTablesRequest, SetIPTablesResponse, SignalProcessRequest, StatsContainerResponse, Storage,
TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest,
VersionCheckResponse, WaitProcessRequest, WaitProcessResponse, WriteStreamRequest,
WriteStreamResponse,

View File

@@ -7,7 +7,7 @@
use anyhow::{anyhow, Result};
use std::convert::TryFrom;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
#[derive(PartialEq, Clone, Default)]
pub struct Empty {}
@@ -561,6 +561,14 @@ pub struct OomEventResponse {
pub container_id: String,
}
// ResizeVolumeRequest is also the common struct for serialization and deserialization with json
// between shim-client HTTP calls to the shim-mgmt-server
#[derive(Serialize, Deserialize, PartialEq, Clone, Default, Debug)]
pub struct ResizeVolumeRequest {
pub volume_guest_path: String,
pub size: u64,
}
#[cfg(test)]
mod test {
use std::convert::TryFrom;

File diff suppressed because it is too large Load Diff

View File

@@ -21,9 +21,15 @@ privdrop = "0.5.2"
nix = "0.25.0"
strum = "0.24.1"
strum_macros = "0.24.3"
serde = { version = "1.0.149", features = ["derive"] }
url = "2.3.1"
futures = "0.3.24"
base64 = "0.13.0"
runtimes = { path = "../../runtime-rs/crates/runtimes" }
serde = "1.0.149"
shim-interface = { path = "../../libs/shim-interface"}
kata-types = { path = "../../libs/kata-types" }
safe-path = { path = "../../libs/safe-path" }
agent = { path = "../../runtime-rs/crates/agent"}
[target.'cfg(target_arch = "s390x")'.dependencies]
reqwest = { version = "0.11", default-features = false, features = ["json", "blocking", "native-tls"] }
@@ -34,3 +40,4 @@ reqwest = { version = "0.11", default-features = false, features = ["json", "blo
[dev-dependencies]
semver = "1.0.12"
tempfile = "3.1.0"
test-utils = { path = "../../libs/test-utils" }

View File

@@ -20,7 +20,7 @@ pub enum Commands {
Check(CheckArgument),
/// Directly assign a volume to Kata Containers to manage
DirectVolume,
DirectVolume(DirectVolumeCommand),
/// Display settings
Env,
@@ -93,3 +93,46 @@ pub enum IpTablesArguments {
/// Configure iptables
Metrics,
}
#[derive(Debug, Args)]
pub struct DirectVolumeCommand {
#[clap(subcommand)]
pub directvol_cmd: DirectVolSubcommand,
}
#[derive(Debug, Subcommand)]
pub enum DirectVolSubcommand {
/// Add a direct assigned block volume device to the Kata Containers runtime
Add(DirectVolAddArgs),
/// Remove a direct assigned block volume device from the Kata Containers runtime
Remove(DirectVolRemoveArgs),
/// Get the filesystem stat of a direct assigned volume
Stats(DirectVolStatsArgs),
/// Resize a direct assigned block volume
Resize(DirectVolResizeArgs),
}
#[derive(Debug, Args)]
pub struct DirectVolAddArgs {
pub volume_path: String,
pub mount_info: String,
}
#[derive(Debug, Args)]
pub struct DirectVolRemoveArgs {
pub volume_path: String,
}
#[derive(Debug, Args)]
pub struct DirectVolStatsArgs {
pub volume_path: String,
}
#[derive(Debug, Args)]
pub struct DirectVolResizeArgs {
pub volume_path: String,
pub resize_size: u64,
}

View File

@@ -42,7 +42,7 @@ pub fn get_single_cpu_info(cpu_info_file: &str, substring: &str) -> Result<Strin
let contents = get_cpu_info(cpu_info_file)?;
if contents.is_empty() {
return Err(anyhow!("cpu_info string is empty"))?;
return Err(anyhow!("cpu_info string is empty"));
}
let subcontents: Vec<&str> = contents.split(substring).collect();
@@ -60,7 +60,7 @@ pub fn get_single_cpu_info(cpu_info_file: &str, substring: &str) -> Result<Strin
#[cfg(any(target_arch = "s390x", target_arch = "x86_64"))]
pub fn get_cpu_flags(cpu_info: &str, cpu_flags_tag: &str) -> Result<String> {
if cpu_info.is_empty() {
return Err(anyhow!("cpu_info string is empty"))?;
return Err(anyhow!("cpu_info string is empty"));
}
let subcontents: Vec<&str> = cpu_info.split('\n').collect();

View File

@@ -17,16 +17,17 @@ use std::process::exit;
use args::{Commands, KataCtlCli};
use ops::check_ops::{
handle_check, handle_check_volume, handle_env, handle_exec, handle_factory, handle_iptables,
handle_metrics, handle_version,
handle_check, handle_env, handle_exec, handle_factory, handle_iptables, handle_metrics,
handle_version,
};
use ops::volume_ops::handle_direct_volume;
fn real_main() -> Result<()> {
let args = KataCtlCli::parse();
match args.command {
Commands::Check(args) => handle_check(args),
Commands::DirectVolume => handle_check_volume(),
Commands::DirectVolume(args) => handle_direct_volume(args),
Commands::Env => handle_env(),
Commands::Exec => handle_exec(),
Commands::Factory => handle_factory(),

View File

@@ -5,3 +5,4 @@
pub mod check_ops;
pub mod version;
pub mod volume_ops;

View File

@@ -114,10 +114,6 @@ pub fn handle_check(checkcmd: CheckArgument) -> Result<()> {
Ok(())
}
pub fn handle_check_volume() -> Result<()> {
Ok(())
}
pub fn handle_env() -> Result<()> {
Ok(())
}

View File

@@ -0,0 +1,293 @@
// Copyright (c) 2022 Boston University
//
// SPDX-License-Identifier: Apache-2.0
//
use crate::args::{DirectVolSubcommand, DirectVolumeCommand};
use anyhow::{anyhow, Ok, Result};
use futures::executor;
use kata_types::mount::{
DirectVolumeMountInfo, KATA_DIRECT_VOLUME_ROOT_PATH, KATA_MOUNT_INFO_FILE_NAME,
};
use nix;
use reqwest::StatusCode;
use safe_path;
use std::{fs, path::PathBuf, time::Duration};
use url;
use agent::ResizeVolumeRequest;
use shim_interface::shim_mgmt::client::MgmtClient;
use shim_interface::shim_mgmt::{
DIRECT_VOLUME_PATH_KEY, DIRECT_VOLUME_RESIZE_URL, DIRECT_VOLUME_STATS_URL,
};
const TIMEOUT: Duration = Duration::from_millis(2000);
const CONTENT_TYPE_JSON: &str = "application/json";
pub fn handle_direct_volume(vol_cmd: DirectVolumeCommand) -> Result<()> {
if !nix::unistd::Uid::effective().is_root() {
return Err(anyhow!(
"super-user privileges are required for the direct-volume subcommand"
));
}
let command = vol_cmd.directvol_cmd;
let cmd_result: Option<String> = match command {
DirectVolSubcommand::Add(args) => add(&args.volume_path, &args.mount_info)?,
DirectVolSubcommand::Remove(args) => remove(&args.volume_path)?,
DirectVolSubcommand::Stats(args) => executor::block_on(stats(&args.volume_path))?,
DirectVolSubcommand::Resize(args) => {
executor::block_on(resize(&args.volume_path, args.resize_size))?
}
};
if let Some(cmd_result) = cmd_result {
println!("{:?}", cmd_result);
}
Ok(())
}
async fn resize(volume_path: &str, size: u64) -> Result<Option<String>> {
let sandbox_id = get_sandbox_id_for_volume(volume_path)?;
let mount_info = get_volume_mount_info(volume_path)?;
let resize_req = ResizeVolumeRequest {
size,
volume_guest_path: mount_info.device,
};
let encoded = serde_json::to_string(&resize_req)?;
let shim_client = MgmtClient::new(&sandbox_id, Some(TIMEOUT))?;
let url = DIRECT_VOLUME_RESIZE_URL;
let response = shim_client
.post(url, &String::from(CONTENT_TYPE_JSON), &encoded)
.await?;
let status = response.status();
if status != StatusCode::OK {
let body = format!("{:?}", response.into_body());
return Err(anyhow!(
"failed to resize volume ({:?}): {:?}",
status,
body
));
}
Ok(None)
}
async fn stats(volume_path: &str) -> Result<Option<String>> {
let sandbox_id = get_sandbox_id_for_volume(volume_path)?;
let mount_info = get_volume_mount_info(volume_path)?;
let req_url = url::form_urlencoded::Serializer::new(String::from(DIRECT_VOLUME_STATS_URL))
.append_pair(DIRECT_VOLUME_PATH_KEY, &mount_info.device)
.finish();
let shim_client = MgmtClient::new(&sandbox_id, Some(TIMEOUT))?;
let response = shim_client.get(&req_url).await?;
// turn body into string
let body = format!("{:?}", response.into_body());
Ok(Some(body))
}
// join_path joins user provided volumepath with kata direct-volume root path
// the volume_path is base64-encoded and then safely joined to the end of path prefix
fn join_path(prefix: &str, volume_path: &str) -> Result<PathBuf> {
if volume_path.is_empty() {
return Err(anyhow!("volume path must not be empty"));
}
let b64_encoded_path = base64::encode(volume_path.as_bytes());
Ok(safe_path::scoped_join(prefix, b64_encoded_path)?)
}
// add writes the mount info (json string) of a direct volume into a filesystem path known to Kata Containers.
pub fn add(volume_path: &str, mount_info: &str) -> Result<Option<String>> {
let mount_info_dir_path = join_path(KATA_DIRECT_VOLUME_ROOT_PATH, volume_path)?;
// create directory if missing
fs::create_dir_all(&mount_info_dir_path)?;
// This behavior of deserializing and serializing comes from
// https://github.com/kata-containers/kata-containers/blob/cd27ad144e1a111cb606015c5c9671431535e644/src/runtime/pkg/direct-volume/utils.go#L57-L79
// Assuming that this is for the purpose of validating the json schema.
let unserialized_mount_info: DirectVolumeMountInfo = serde_json::from_str(mount_info)?;
let mount_info_file_path = mount_info_dir_path.join(KATA_MOUNT_INFO_FILE_NAME);
let serialized_mount_info = serde_json::to_string(&unserialized_mount_info)?;
fs::write(mount_info_file_path, serialized_mount_info)?;
Ok(None)
}
// remove deletes the direct volume path including all the files inside it.
pub fn remove(volume_path: &str) -> Result<Option<String>> {
let path = join_path(KATA_DIRECT_VOLUME_ROOT_PATH, volume_path)?;
// removes path and any children it contains.
fs::remove_dir_all(path)?;
Ok(None)
}
pub fn get_volume_mount_info(volume_path: &str) -> Result<DirectVolumeMountInfo> {
let mount_info_file_path =
join_path(KATA_DIRECT_VOLUME_ROOT_PATH, volume_path)?.join(KATA_MOUNT_INFO_FILE_NAME);
let mount_info_file = fs::read_to_string(mount_info_file_path)?;
let mount_info: DirectVolumeMountInfo = serde_json::from_str(&mount_info_file)?;
Ok(mount_info)
}
// get_sandbox_id_for_volume finds the id of the first sandbox found in the dir.
// We expect a direct-assigned volume is associated with only a sandbox at a time.
pub fn get_sandbox_id_for_volume(volume_path: &str) -> Result<String> {
let dir_path = join_path(KATA_DIRECT_VOLUME_ROOT_PATH, volume_path)?;
let paths = fs::read_dir(dir_path)?;
for path in paths {
let path = path?;
// compare with MOUNT_INFO_FILE_NAME
if path.file_name() == KATA_MOUNT_INFO_FILE_NAME {
continue;
}
let file_name = path.file_name();
// turn file_name into String and return it
let file_name = file_name.to_str().ok_or_else(|| {
anyhow!(
"failed to convert file_name {:?} to string",
file_name.to_string_lossy()
)
})?;
return Ok(String::from(file_name));
}
return Err(anyhow!("no sandbox found for {}", volume_path));
}
#[cfg(test)]
mod tests {
use super::*;
use kata_types::mount::DirectVolumeMountInfo;
use std::{collections::HashMap, fs};
use tempfile::tempdir;
use test_utils::skip_if_not_root;
#[test]
fn test_get_sandbox_id_for_volume() {
// this test has to run as root, so has to manually cleanup afterwards
skip_if_not_root!();
// create KATA_DIRECT_VOLUME_ROOT_PATH first as safe_path::scoped_join
// requires prefix dir to exist
fs::create_dir_all(KATA_DIRECT_VOLUME_ROOT_PATH)
.expect("create kata direct volume root path failed");
let test_sandbox_id = "sandboxid_test_file";
let test_volume_path = String::from("a/b/c");
let joined_volume_path =
join_path(KATA_DIRECT_VOLUME_ROOT_PATH, &test_volume_path).unwrap();
let test_file_dir = joined_volume_path.join(test_sandbox_id);
fs::create_dir_all(&joined_volume_path).expect("failed to mkdir -p");
fs::write(&test_file_dir, "teststring").expect("failed to write");
// test that get_sandbox_id gets the correct sandboxid it sees
let got = get_sandbox_id_for_volume(&test_volume_path).unwrap();
assert!(got.eq(test_sandbox_id));
// test that get_sandbox_id returns error if no sandboxid found
fs::remove_file(&test_file_dir).expect("failed to remove");
get_sandbox_id_for_volume(&test_volume_path).expect_err("error expected");
// cleanup test directory
fs::remove_dir_all(&joined_volume_path).expect("failed to cleanup test")
}
#[test]
fn test_path_join() {
#[derive(Debug)]
struct TestData<'a> {
rootfs: &'a str,
volume_path: &'a str,
result: Result<PathBuf>,
}
// the safe_path::scoped_join requires the prefix path to exist on testing machine
let root_fs = tempdir().expect("failed to create tmpdir").into_path();
let root_fs_str = root_fs.to_str().unwrap();
let relative_secret_path = "../../etc/passwd";
let b64_relative_secret_path = base64::encode(relative_secret_path);
// this byte array b64encodes to "/abcdddd"
let b64_abs_path = vec![253, 166, 220, 117, 215, 93];
let converted_relative_path = "abcdddd";
let tests = &[
TestData {
rootfs: root_fs_str,
volume_path: "",
result: Err(anyhow!("volume path must not be empty")),
},
TestData {
rootfs: root_fs_str,
volume_path: relative_secret_path,
result: Ok(root_fs.join(b64_relative_secret_path)),
},
TestData {
rootfs: root_fs_str,
volume_path: unsafe { std::str::from_utf8_unchecked(&b64_abs_path) },
result: Ok(root_fs.join(converted_relative_path)),
},
];
for (i, d) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, d);
let result = join_path(d.rootfs, d.volume_path);
let msg = format!("{}, result: {:?}", msg, result);
if d.result.is_ok() {
assert!(
result.as_ref().unwrap() == d.result.as_ref().unwrap(),
"{}",
msg
);
continue;
}
let expected_error = format!("{}", d.result.as_ref().unwrap_err());
let actual_error = format!("{}", result.unwrap_err());
assert!(actual_error == expected_error, "{}", msg);
}
}
#[test]
fn test_add_remove() {
skip_if_not_root!();
// example volume dir is a/b/c, note the behavior of join would take "/a" as absolute path.
// testing with isn't really viable here since the path is then b64 encoded,
// so this test had to run as root and call `remove()` to manully cleanup afterwards.
fs::create_dir_all(KATA_DIRECT_VOLUME_ROOT_PATH)
.expect("create kata direct volume root path failed");
let base_dir = tempdir().expect("failed to create tmpdir");
let dir_name = base_dir.path().join("a/b/c");
let volume_path = String::from(dir_name.to_str().unwrap());
let actual: DirectVolumeMountInfo = DirectVolumeMountInfo {
volume_type: String::from("block"),
device: String::from("/dev/sda"),
fs_type: String::from("ext4"),
metadata: HashMap::new(),
options: vec![String::from("journal_dev"), String::from("noload")],
};
// serialize volumemountinfo into json string
let mount_info = serde_json::to_string(&actual).unwrap();
add(&volume_path, &mount_info).expect("add failed");
let expected_file_path = volume_path;
let expected: DirectVolumeMountInfo = get_volume_mount_info(&expected_file_path).unwrap();
remove(&expected_file_path).expect("remove failed");
assert_eq!(actual.device, expected.device);
assert_eq!(actual.fs_type, expected.fs_type);
assert_eq!(actual.metadata, expected.metadata);
assert_eq!(actual.options, expected.options);
assert_eq!(actual.volume_type, expected.volume_type);
}
}