runtime-rs: shim implements for runtime-rs

Responsible for processing shim related commands: start, delete.

This patch is extracted from Alibaba Cloud's internal repository *runD*
Thanks to all contributors!

Fixes: #3785
Signed-off-by: acetang <aceapril@126.com>
Signed-off-by: Bin Liu <bin@hyper.sh>
Signed-off-by: Chao Wu <chaowu@linux.alibaba.com>
Signed-off-by: Eryu Guan <eguan@linux.alibaba.com>
Signed-off-by: Fupan Li <lifupan@gmail.com>
Signed-off-by: gexuyang <gexuyang@linux.alibaba.com>
Signed-off-by: Helin Guo <helinguo@linux.alibaba.com>
Signed-off-by: He Rongguang <herongguang@linux.alibaba.com>
Signed-off-by: Hui Zhu <teawater@gmail.com>
Signed-off-by: Issac Hai <hjwissac@linux.alibaba.com>
Signed-off-by: Jiahuan Chao <jhchao@linux.alibaba.com>
Signed-off-by: lichenglong9 <lichenglong9@163.com>
Signed-off-by: mengze <mengze@linux.alibaba.com>
Signed-off-by: Qingyuan Hou <qingyuan.hou@linux.alibaba.com>
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
Signed-off-by: shiqiangzhang <shiyu.zsq@linux.alibaba.com>
Signed-off-by: Simon Guo <wei.guo.simon@linux.alibaba.com>
Signed-off-by: Tim Zhang <tim@hyper.sh>
Signed-off-by: wanglei01 <wllenyj@linux.alibaba.com>
Signed-off-by: Wei Yang <wei.yang1@linux.alibaba.com>
Signed-off-by: yanlei <yl.on.the.way@gmail.com>
Signed-off-by: Yiqun Leng <yqleng@linux.alibaba.com>
Signed-off-by: yuchang.xu <yuchang.xu@linux.alibaba.com>
Signed-off-by: Yves Chan <lingfu@linux.alibaba.com>
Signed-off-by: Zack <zmlcc@linux.alibaba.com>
Signed-off-by: Zhiheng Tao <zhihengtao@linux.alibaba.com>
Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com>
Signed-off-by: Zizheng Bian <zizheng.bian@linux.alibaba.com>
This commit is contained in:
Zack
2022-02-22 15:39:56 +08:00
committed by Fupan Li
parent 641b736106
commit 278f843f92
23 changed files with 3288 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
[package]
name = "shim"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
description = "Containerd shim runtime for Kata Containers"
keywords = ["kata-containers", "shim"]
repository = "https://github.com/kata-containers/kata-containers.git"
license = "Apache-2.0"
edition = "2018"
[[bin]]
name = "containerd-shim-kata-v2"
path = "src/bin/main.rs"
[dependencies]
anyhow = "^1.0"
backtrace = {version = ">=0.3.35", features = ["libunwind", "libbacktrace", "std"], default-features = false}
# TODO: change to version after release
# issue: https://github.com/kata-containers/kata-containers/issues/3866
containerd-shim-protos = { git="https://github.com/containerd/rust-extensions.git", rev = "c0baac598fc3ad62f651e8aae8de15db2ce5695c", features = ["async"]}
go-flag = "0.1.0"
libc = "0.2.108"
log = "0.4.14"
nix = "0.16.0"
protobuf = "2.23.0"
sha2 = "=0.9.3"
slog = {version = "2.7.0", features = ["std", "release_max_level_trace", "max_level_trace"]}
slog-async = "2.7.0"
slog-scope = "4.4.0"
slog-stdlog = "4.1.0"
thiserror = "1.0.30"
tokio = { version = "1.8.0", features = [ "rt", "rt-multi-thread" ] }
unix_socket2 = "0.5.4"
kata-types = { path = "../../../libs/kata-types"}
kata-sys-util = { path = "../../../libs/kata-sys-util"}
logging = { path = "../../../libs/logging"}
oci = { path = "../../../libs/oci" }
[build-dependencies]
vergen = { version = "6", default-features = false, features = ["build", "git", "rustc"] }
[dev-dependencies]
tempfile = "3.2.0"
serial_test = "0.5.1"
tests_utils = { path = "../../tests/utils"}

View File

@@ -0,0 +1,12 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use vergen::{vergen, Config};
fn main() {
// Generate the default 'cargo:' instruction output
vergen(Config::default()).unwrap();
}

View File

@@ -0,0 +1,325 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{os::unix::fs::FileTypeExt, path::PathBuf};
use anyhow::{anyhow, Context, Result};
use kata_sys_util::validate;
use crate::Error;
/// Received command-line arguments or environment arguments
/// from a shimv2 container manager such as containerd.
///
/// For detailed information, please refer to the
/// [shim spec](https://github.com/containerd/containerd/blob/main/runtime/v2/README.md).
#[derive(Debug, Default, Clone)]
pub struct Args {
/// the id of the container
pub id: String,
/// the namespace for the container
pub namespace: String,
/// the address of the containerd's main socket
pub address: String,
/// the binary path to publish events back to containerd
pub publish_binary: String,
/// Abstract socket path to serve.
pub socket: String,
/// the path to the bundle to delete
pub bundle: String,
/// Whether or not to enable debug
pub debug: bool,
}
impl Args {
/// Check the shim argument object is vaild or not.
///
/// The id, namespace, address and publish_binary are mandatory for START, RUN and DELETE.
/// And bundle is mandatory for DELETE.
pub fn validate(&mut self, should_check_bundle: bool) -> Result<()> {
if self.id.is_empty()
|| self.namespace.is_empty()
|| self.address.is_empty()
|| self.publish_binary.is_empty()
{
return Err(anyhow!(Error::ArgumentIsEmpty(format!(
"id: {} namespace: {} address: {} publish_binary: {}",
&self.id, &self.namespace, &self.address, &self.publish_binary
))));
}
validate::verify_cid(&self.id).context("verify cid")?;
validate::verify_cid(&self.namespace).context("verify namespace")?;
// Ensure `address` is a valid path.
let path = PathBuf::from(self.address.clone())
.canonicalize()
.context(Error::InvalidPath(self.address.clone()))?;
let md = path
.metadata()
.context(Error::FileGetMetadata(format!("{:?}", path)))?;
if !md.file_type().is_socket() {
return Err(Error::InvalidArgument).context("address is not socket");
}
self.address = path
.to_str()
.map(|v| v.to_owned())
.ok_or(Error::InvalidArgument)?;
// Ensure `bundle` is a valid path.
if should_check_bundle {
if self.bundle.is_empty() {
return Err(anyhow!(Error::ArgumentIsEmpty("bundle".to_string())));
}
let path = PathBuf::from(self.bundle.clone())
.canonicalize()
.map_err(|_| Error::InvalidArgument)?;
let md = path
.metadata()
.map_err(|_| Error::InvalidArgument)
.context("get address metadata")?;
if !md.is_dir() {
return Err(Error::InvalidArgument).context("medata is dir");
}
self.bundle = path
.to_str()
.map(|v| v.to_owned())
.ok_or(Error::InvalidArgument)
.context("path to string")?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::net::UnixListener;
use anyhow::anyhow;
use kata_sys_util::validate;
#[test]
fn test_args_is_valid() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_path_buf();
let path = path.to_str().unwrap();
let bind_address = &format!("{}/socket1", path);
UnixListener::bind(bind_address).unwrap();
#[derive(Debug)]
struct TestData {
arg: Args,
should_check_bundle: bool,
result: Result<()>,
}
let default_id = "1dfc0567".to_string();
let default_namespace = "ns1".to_string();
let default_address = bind_address.to_string();
let default_publish_binary = "containerd".to_string();
let default_socket = "socket".to_string();
let default_bundle = path.to_string();
let default_debug = false;
let mut arg = Args {
id: default_id.clone(),
namespace: default_namespace.clone(),
address: default_address.clone(),
publish_binary: default_publish_binary.clone(),
socket: default_socket,
bundle: default_bundle.clone(),
debug: default_debug,
};
let tests = &[
TestData {
arg: arg.clone(),
should_check_bundle: false,
result: Ok(()),
},
TestData {
arg: {
arg.namespace = "".to_string();
arg.clone()
},
should_check_bundle: false,
result: Err(anyhow!(Error::ArgumentIsEmpty(format!(
"id: {} namespace: {} address: {} publish_binary: {}",
&arg.id, &arg.namespace, &arg.address, &arg.publish_binary
)))),
},
TestData {
arg: {
arg.namespace = default_namespace.clone();
arg.clone()
},
should_check_bundle: false,
result: Ok(()),
},
TestData {
arg: {
arg.id = "".to_string();
arg.clone()
},
should_check_bundle: false,
result: Err(anyhow!(Error::ArgumentIsEmpty(format!(
"id: {} namespace: {} address: {} publish_binary: {}",
&arg.id, &arg.namespace, &arg.address, &arg.publish_binary
)))),
},
TestData {
arg: {
arg.id = default_id;
arg.clone()
},
should_check_bundle: false,
result: Ok(()),
},
TestData {
arg: {
arg.address = "".to_string();
arg.clone()
},
should_check_bundle: false,
result: Err(anyhow!(Error::ArgumentIsEmpty(format!(
"id: {} namespace: {} address: {} publish_binary: {}",
&arg.id, &arg.namespace, &arg.address, &arg.publish_binary
)))),
},
TestData {
arg: {
arg.address = default_address.clone();
arg.clone()
},
should_check_bundle: false,
result: Ok(()),
},
TestData {
arg: {
arg.publish_binary = "".to_string();
arg.clone()
},
should_check_bundle: false,
result: Err(anyhow!(Error::ArgumentIsEmpty(format!(
"id: {} namespace: {} address: {} publish_binary: {}",
&arg.id, &arg.namespace, &arg.address, &arg.publish_binary
)))),
},
TestData {
arg: {
arg.publish_binary = default_publish_binary;
arg.clone()
},
should_check_bundle: false,
result: Ok(()),
},
TestData {
arg: {
arg.bundle = "".to_string();
arg.clone()
},
should_check_bundle: false,
result: Ok(()),
},
TestData {
arg: arg.clone(),
should_check_bundle: true,
result: Err(anyhow!(Error::ArgumentIsEmpty("bundle".to_string()))),
},
TestData {
arg: {
arg.bundle = default_bundle;
arg.clone()
},
should_check_bundle: true,
result: Ok(()),
},
TestData {
arg: {
arg.namespace = "id1/id2".to_string();
arg.clone()
},
should_check_bundle: true,
result: Err(
anyhow!(validate::Error::InvalidContainerID("id/id2".to_string()))
.context("verify namespace"),
),
},
TestData {
arg: {
arg.namespace = default_namespace.clone() + "id1 id2";
arg.clone()
},
should_check_bundle: true,
result: Err(anyhow!(validate::Error::InvalidContainerID(
default_namespace.clone() + "id1 id2",
))
.context("verify namespace")),
},
TestData {
arg: {
arg.namespace = default_namespace.clone() + "id2\tid2";
arg.clone()
},
should_check_bundle: true,
result: Err(anyhow!(validate::Error::InvalidContainerID(
default_namespace.clone() + "id1\tid2",
))
.context("verify namespace")),
},
TestData {
arg: {
arg.namespace = default_namespace;
arg.clone()
},
should_check_bundle: true,
result: Ok(()),
},
TestData {
arg: {
arg.address = default_address.clone() + "/..";
arg.clone()
},
should_check_bundle: true,
result: Err(anyhow!(Error::InvalidPath(arg.address.clone()))),
},
TestData {
arg: {
arg.address = default_address.clone() + "/..";
arg.clone()
},
should_check_bundle: true,
result: Err(anyhow!(Error::InvalidPath(arg.address.clone()))),
},
TestData {
arg: {
arg.address = default_address;
arg
},
should_check_bundle: true,
result: Ok(()),
},
];
for (i, t) in tests.iter().enumerate() {
let msg = format!("test[{}]: {:?}", i, t);
let should_check_bundle = t.should_check_bundle;
let result = t.arg.clone().validate(should_check_bundle);
let msg = format!("{}, result: {:?}", msg, result);
if t.result.is_ok() {
assert!(result.is_ok(), "{}", msg);
} else {
let expected_error = format!("{}", t.result.as_ref().unwrap_err());
let actual_error = format!("{}", result.unwrap_err());
assert!(actual_error == expected_error, "{}", msg);
}
}
}
}

View File

@@ -0,0 +1,192 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
ffi::{OsStr, OsString},
path::PathBuf,
};
use anyhow::{anyhow, Context, Result};
use nix::{
mount::{mount, MsFlags},
sched::{self, CloneFlags},
};
use shim::{config, Args, Error, ShimExecutor};
const DEFAULT_RUNTIME_WORKER_THREADS: usize = 2;
const ENV_RUNTIME_WORKER_THREADS: &str = "RUNTIME_WORKER_THREADS";
#[derive(Debug)]
enum Action {
Run(Args),
Start(Args),
Delete(Args),
Help,
Version,
}
fn parse_args(args: &[OsString]) -> Result<Action> {
let mut help = false;
let mut version = false;
let mut shim_args = Args::default();
// Crate `go_flag` is used to keep compatible with go/flag package.
let rest_args = go_flag::parse_args_with_warnings::<String, _, _>(&args[1..], None, |flags| {
flags.add_flag("address", &mut shim_args.address);
flags.add_flag("bundle", &mut shim_args.bundle);
flags.add_flag("debug", &mut shim_args.debug);
flags.add_flag("id", &mut shim_args.id);
flags.add_flag("namespace", &mut shim_args.namespace);
flags.add_flag("publish-binary", &mut shim_args.publish_binary);
flags.add_flag("socket", &mut shim_args.socket);
flags.add_flag("help", &mut help);
flags.add_flag("version", &mut version);
})
.context(Error::ParseArgument(format!("{:?}", args)))?;
if help {
Ok(Action::Help)
} else if version {
Ok(Action::Version)
} else if rest_args.is_empty() {
Ok(Action::Run(shim_args))
} else if rest_args[0] == "start" {
Ok(Action::Start(shim_args))
} else if rest_args[0] == "delete" {
Ok(Action::Delete(shim_args))
} else {
Err(anyhow!(Error::InvalidArgument))
}
}
fn show_help(cmd: &OsStr) {
let path = PathBuf::from(cmd);
let name = match path.file_name() {
Some(v) => v.to_str(),
None => None,
};
let name = name.unwrap_or(config::RUNTIME_NAME);
println!(
r#"Usage of {}:
-address string
grpc address back to main containerd
-bundle string
path to the bundle if not workdir
-debug
enable debug output in logs
-id string
id of the task
-namespace string
namespace that owns the shim
-publish-binary string
path to publish binary (used for publishing events) (default "containerd")
-socket string
socket path to serve
--version
show the runtime version detail and exit
"#,
name
);
}
fn show_version(err: Option<anyhow::Error>) {
let data = format!(
r#"{} containerd shim: id: {}, version: {}, commit: {}"#,
config::PROJECT_NAME,
config::CONTAINERD_RUNTIME_NAME,
config::RUNTIME_VERSION,
config::RUNTIME_VERSION_COMMIT,
);
if let Some(err) = err {
eprintln!(
"{}\r\nERROR: {} failed: {:?}",
data,
config::RUNTIME_NAME,
err
);
} else {
println!("{}", data)
}
}
fn get_tokio_runtime() -> Result<tokio::runtime::Runtime> {
let worker_threads = std::env::var(ENV_RUNTIME_WORKER_THREADS)
.unwrap_or_default()
.parse()
.unwrap_or(DEFAULT_RUNTIME_WORKER_THREADS);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.enable_all()
.build()
.context("prepare tokio runtime")?;
Ok(rt)
}
fn real_main() -> Result<()> {
let args = std::env::args_os().collect::<Vec<_>>();
if args.is_empty() {
return Err(anyhow!(Error::ArgumentIsEmpty(
"command-line arguments".to_string()
)));
}
let action = parse_args(&args).context("parse args")?;
match action {
Action::Start(args) => ShimExecutor::new(args).start().context("shim start")?,
Action::Delete(args) => ShimExecutor::new(args).delete().context("shim delete")?,
Action::Run(args) => {
// set mnt namespace
// need setup before other async call
setup_mnt().context("setup mnt")?;
let mut shim = ShimExecutor::new(args);
let rt = get_tokio_runtime().context("get tokio runtime")?;
rt.block_on(shim.run())?
}
Action::Help => show_help(&args[0]),
Action::Version => show_version(None),
}
Ok(())
}
fn main() {
if let Err(err) = real_main() {
show_version(Some(err));
}
}
fn setup_mnt() -> Result<()> {
// Unshare the mount namespace, so that the calling process has a private copy of its namespace
// which is not shared with any other process.
sched::unshare(CloneFlags::CLONE_NEWNS).context("unshare clone newns")?;
// Mount and unmount events propagate into this mount from the (master) shared peer group of
// which it was formerly a member. Mount and unmount events under this mount do not propagate
// to any peer.
mount(
Some("none"),
"/",
Some(""),
MsFlags::MS_REC | MsFlags::MS_SLAVE,
Some(""),
)
.context("mount with slave")?;
// Mount and unmount events immediately under this mount will propagate to the other mounts
// that are members of this mount's peer group.
mount(
Some("none"),
"/",
Some(""),
MsFlags::MS_REC | MsFlags::MS_SHARED,
Some(""),
)
.context("mount with shared")?;
Ok(())
}

View File

@@ -0,0 +1,19 @@
// Copyright (c) 2020 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
//
// WARNING: This file is auto-generated - DO NOT EDIT!
//
#![allow(dead_code)]
pub const PROJECT_NAME: &str = "@PROJECT_NAME@";
pub const RUNTIME_VERSION: &str = "@RUNTIME_VERSION@";
pub const RUNTIME_VERSION_COMMIT: &str = "@VERSION_COMMIT@";
pub const RUNTIME_GIT_COMMIT: &str = "@COMMIT@";
pub const RUNTIME_NAME: &str = "@RUNTIME_NAME@";
pub const CONTAINERD_RUNTIME_NAME: &str = "@CONTAINERD_RUNTIME_NAME@";
pub const RUNTIME_DIR: &str = "@BINDIR@";
pub const RUNTIME_PATH: &str = "@BINDIR@/@RUNTIME_NAME@";

View File

@@ -0,0 +1,52 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::path::PathBuf;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("failed to parse argument {0}")]
ParseArgument(String),
#[error("failed to get bundle path")]
GetBundlePath,
#[error("invalid argument")]
InvalidArgument,
#[error("argument is empty {0}")]
ArgumentIsEmpty(String),
#[error("invalid path {0}")]
InvalidPath(String),
// File
#[error("failed to open file {0}")]
FileOpen(String),
#[error("failed to get file metadata {0}")]
FileGetMetadata(String),
#[error("failed to read file {0}")]
FileRead(String),
#[error("failed to write file {0}")]
FileWrite(String),
#[error("empty sandbox id")]
EmptySandboxId,
#[error("failed to get self exec: {0}")]
SelfExec(#[source] std::io::Error),
#[error("failed to bind socket at {1} with error: {0}")]
BindSocket(#[source] std::io::Error, PathBuf),
#[error("failed to spawn child: {0}")]
SpawnChild(#[source] std::io::Error),
#[error("failed to clean container {0}")]
CleanUpContainer(String),
#[error("failed to get env variable: {0}")]
EnvVar(#[source] std::env::VarError),
#[error("failed to parse server fd environment variable {0}")]
ServerFd(String),
#[error("failed to wait ttrpc server when {0}")]
WaitServer(String),
#[error("failed to get system time: {0}")]
SystemTime(#[source] std::time::SystemTimeError),
#[error("failed to parse pid")]
ParsePid,
}

View File

@@ -0,0 +1,28 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate slog;
macro_rules! sl {
() => {
slog_scope::logger().new(slog::o!("subsystem" => "shim"))
};
}
mod args;
pub use args::Args;
mod error;
pub use error::Error;
mod logger;
mod panic_hook;
mod shim;
pub use shim::ShimExecutor;
#[rustfmt::skip]
pub mod config;
mod shim_delete;
mod shim_run;
mod shim_start;

View File

@@ -0,0 +1,41 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::os::unix::fs::OpenOptionsExt;
use anyhow::{Context, Result};
use crate::Error;
pub(crate) fn set_logger(path: &str, sid: &str, is_debug: bool) -> Result<slog_async::AsyncGuard> {
let fifo = std::fs::OpenOptions::new()
.custom_flags(libc::O_NONBLOCK)
.create(true)
.write(true)
.append(true)
.open(path)
.context(Error::FileOpen(path.to_string()))?;
let level = if is_debug {
slog::Level::Debug
} else {
slog::Level::Info
};
let (logger, async_guard) = logging::create_logger("kata-runtime", sid, level, fifo);
// not reset global logger when drop
slog_scope::set_global_logger(logger).cancel_reset();
let level = if is_debug {
log::Level::Debug
} else {
log::Level::Info
};
let _ = slog_stdlog::init_with_level(level).context(format!("init with level {}", level))?;
Ok(async_guard)
}

View File

@@ -0,0 +1,41 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{boxed::Box, ops::Deref};
use backtrace::Backtrace;
// TODO: the Kata 1.x runtime had a SIGUSR1 handler that would log a formatted backtrace on
// receiving that signal. It could be useful to re-add that feature.
pub(crate) fn set_panic_hook() {
std::panic::set_hook(Box::new(move |panic_info| {
let (filename, line) = panic_info
.location()
.map(|loc| (loc.file(), loc.line()))
.unwrap_or(("<unknown>", 0));
let cause = panic_info
.payload()
.downcast_ref::<String>()
.map(std::string::String::deref);
let cause = cause.unwrap_or_else(|| {
panic_info
.payload()
.downcast_ref::<&str>()
.copied()
.unwrap_or("<cause unknown>")
});
let bt = Backtrace::new();
let bt_data = format!("{:?}", bt);
error!(
sl!(),
"A panic occurred at {}:{}: {}\r\n{:?}", filename, line, cause, bt_data
);
std::process::abort();
}));
}

View File

@@ -0,0 +1,116 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
os::unix::ffi::OsStrExt,
path::{Path, PathBuf},
};
use anyhow::{anyhow, Context, Result};
use sha2::Digest;
use crate::{Args, Error};
const SOCKET_ROOT: &str = "/run/containerd";
const SHIM_PID_FILE: &str = "shim.pid";
pub(crate) const ENV_KATA_RUNTIME_BIND_FD: &str = "KATA_RUNTIME_BIND_FD";
/// Command executor for shim.
pub struct ShimExecutor {
pub(crate) args: Args,
}
impl ShimExecutor {
/// Create a new instance of [`Shim`].
pub fn new(args: Args) -> Self {
ShimExecutor { args }
}
pub(crate) fn load_oci_spec(&self) -> Result<oci::Spec> {
let bundle_path = self.get_bundle_path()?;
let spec_file = bundle_path.join("config.json");
oci::Spec::load(spec_file.to_str().unwrap_or_default()).context("load spec")
}
pub(crate) fn write_address(&self, address: &Path) -> Result<()> {
let dir = self.get_bundle_path()?;
let file_path = &dir.join("address");
std::fs::write(file_path, address.as_os_str().as_bytes())
.context(Error::FileWrite(format!("{:?}", &file_path)))
}
pub(crate) fn write_pid_file(&self, pid: u32) -> Result<()> {
let dir = self.get_bundle_path()?;
let file_path = &dir.join(SHIM_PID_FILE);
std::fs::write(file_path, format!("{}", pid))
.context(Error::FileWrite(format!("{:?}", &file_path)))
}
pub(crate) fn read_pid_file(&self, bundle_path: &Path) -> Result<u32> {
let file_path = bundle_path.join(SHIM_PID_FILE);
let data = std::fs::read_to_string(&file_path)
.context(Error::FileOpen(format!("{:?}", file_path)))?;
data.parse::<u32>().context(Error::ParsePid)
}
pub(crate) fn get_bundle_path(&self) -> Result<PathBuf> {
std::env::current_dir().context(Error::GetBundlePath)
}
pub(crate) fn socket_address(&self, id: &str) -> Result<PathBuf> {
if id.is_empty() {
return Err(anyhow!(Error::EmptySandboxId));
}
let data = [&self.args.address, &self.args.namespace, id].join("/");
let mut hasher = sha2::Sha256::new();
hasher.update(data);
Ok(PathBuf::from(format!(
"unix://{}/s/{:X}",
SOCKET_ROOT,
hasher.finalize()
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
#[test]
#[serial]
fn test_shim_executor() {
let dir = tempfile::tempdir().unwrap();
let bundle_path = dir.path();
std::env::set_current_dir(bundle_path).unwrap();
let args = Args {
id: "1dfc0567".to_string(),
namespace: "test_namespace".into(),
address: "containerd_socket".into(),
publish_binary: "containerd".into(),
socket: "socket".into(),
bundle: bundle_path.to_str().unwrap().into(),
debug: false,
};
let executor = ShimExecutor::new(args);
executor.write_address(Path::new("12345")).unwrap();
let dir = executor.get_bundle_path().unwrap();
let file_path = &dir.join("address");
let buf = std::fs::read_to_string(file_path).unwrap();
assert_eq!(&buf, "12345");
executor.write_pid_file(1267).unwrap();
let read_pid = executor.read_pid_file(&dir).unwrap();
assert_eq!(read_pid, 1267);
}
}

View File

@@ -0,0 +1,71 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{Context, Result};
use containerd_shim_protos::shim::shim::DeleteResponse;
use protobuf::Message;
use crate::{shim::ShimExecutor, Error};
impl ShimExecutor {
pub fn delete(&mut self) -> Result<()> {
self.args.validate(true).context("validate")?;
let rsp = self.do_cleanup().context("do cleanup")?;
rsp.write_to_writer(&mut std::io::stdout())
.context(Error::FileWrite(format!("write {:?} to stdout", rsp)))?;
Ok(())
}
fn do_cleanup(&self) -> Result<DeleteResponse> {
let mut rsp = DeleteResponse::new();
rsp.set_exit_status(128 + libc::SIGKILL as u32);
let mut exited_time = protobuf::well_known_types::Timestamp::new();
let seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(Error::SystemTime)?
.as_secs() as i64;
exited_time.set_seconds(seconds);
rsp.set_exited_at(exited_time);
// TODO: implement cleanup
Ok(rsp)
}
}
#[cfg(test)]
mod tests {
use serial_test::serial;
use tests_utils::gen_id;
use super::*;
use crate::Args;
#[test]
#[serial]
fn test_shim_delete() {
let dir = tempfile::tempdir().unwrap();
let bundle_path = dir.path();
std::env::set_current_dir(bundle_path).unwrap();
let id = gen_id(16);
let namespace = gen_id(16);
let args = Args {
id,
namespace,
address: "containerd_socket".into(),
publish_binary: "containerd".into(),
socket: "socket".into(),
bundle: bundle_path.to_str().unwrap().into(),
debug: false,
};
let executor = ShimExecutor::new(args);
let resp = executor.do_cleanup().unwrap();
assert_eq!(resp.exit_status, 128 + libc::SIGKILL as u32);
assert!(resp.exited_at.as_ref().unwrap().seconds > 0);
}
}

View File

@@ -0,0 +1,54 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::os::unix::io::RawFd;
use anyhow::{Context, Result};
use crate::{
logger,
shim::{ShimExecutor, ENV_KATA_RUNTIME_BIND_FD},
Error,
};
impl ShimExecutor {
pub async fn run(&mut self) -> Result<()> {
crate::panic_hook::set_panic_hook();
let sid = self.args.id.clone();
let bundle_path = self.get_bundle_path().context("get bundle")?;
let path = bundle_path.join("log");
let _logger_guard =
logger::set_logger(path.to_str().unwrap(), &sid, self.args.debug).context("set logger");
self.do_run()
.await
.map_err(|err| {
error!(sl!(), "failed run shim {:?}", err);
err
})
.context("run shim")?;
Ok(())
}
async fn do_run(&mut self) -> Result<()> {
info!(sl!(), "start to run");
self.args.validate(false).context("validata")?;
let _server_fd = get_server_fd().context("get server fd")?;
// TODO: implement run
Ok(())
}
}
fn get_server_fd() -> Result<RawFd> {
let env_fd = std::env::var(ENV_KATA_RUNTIME_BIND_FD).map_err(Error::EnvVar)?;
let fd = env_fd
.parse::<RawFd>()
.map_err(|_| Error::ServerFd(env_fd))?;
Ok(fd)
}

View File

@@ -0,0 +1,238 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
fs,
io::Write,
os::unix::{io::IntoRawFd, prelude::OsStrExt},
path::{Path, PathBuf},
};
use anyhow::{anyhow, Context, Result};
use kata_types::{container::ContainerType, k8s};
use unix_socket::UnixListener;
use crate::{
shim::{ShimExecutor, ENV_KATA_RUNTIME_BIND_FD},
Error,
};
impl ShimExecutor {
pub fn start(&mut self) -> Result<()> {
self.args.validate(false).context("validate")?;
let address = self.do_start().context("do start")?;
std::io::stdout()
.write_all(address.as_os_str().as_bytes())
.context("failed to write stdout")?;
Ok(())
}
fn do_start(&mut self) -> Result<PathBuf> {
let spec = self.load_oci_spec()?;
let (container_type, id) = k8s::container_type_with_id(&spec);
match container_type {
ContainerType::PodSandbox => {
let address = self.socket_address(&self.args.id)?;
let socket = new_listener(&address)?;
let child_pid = self.create_shim_process(socket)?;
self.write_pid_file(child_pid)?;
self.write_address(&address)?;
Ok(address)
}
ContainerType::PodContainer => {
let sid = id
.ok_or(Error::InvalidArgument)
.context("get sid for container")?;
let (address, pid) = self.get_shim_info_from_sandbox(&sid)?;
self.write_pid_file(pid)?;
self.write_address(&address)?;
Ok(address)
}
}
}
fn new_command(&self) -> Result<std::process::Command> {
if self.args.id.is_empty()
|| self.args.namespace.is_empty()
|| self.args.address.is_empty()
|| self.args.publish_binary.is_empty()
{
return Err(anyhow!("invalid param"));
}
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let self_exec = std::env::current_exe().map_err(Error::SelfExec)?;
let mut command = std::process::Command::new(self_exec);
command
.current_dir(bundle_path)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.arg("-id")
.arg(&self.args.id)
.arg("-namespace")
.arg(&self.args.namespace)
.arg("-address")
.arg(&self.args.address)
.arg("-publish-binary")
.arg(&self.args.publish_binary)
.env("RUST_BACKTRACE", "1");
if self.args.debug {
command.arg("-debug");
}
Ok(command)
}
fn create_shim_process<T: IntoRawFd>(&self, socket: T) -> Result<u32> {
let mut cmd = self.new_command().context("new command")?;
cmd.env(
ENV_KATA_RUNTIME_BIND_FD,
format!("{}", socket.into_raw_fd()),
);
let child = cmd
.spawn()
.map_err(Error::SpawnChild)
.context("spawn child")?;
Ok(child.id())
}
fn get_shim_info_from_sandbox(&self, sandbox_id: &str) -> Result<(PathBuf, u32)> {
// All containers of a pod share the same pod socket address.
let address = self.socket_address(sandbox_id).context("socket address")?;
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let parent_bundle_path = Path::new(&bundle_path)
.parent()
.unwrap_or_else(|| Path::new(""));
let sandbox_bundle_path = parent_bundle_path
.join(sandbox_id)
.canonicalize()
.context(Error::GetBundlePath)?;
let pid = self.read_pid_file(&sandbox_bundle_path)?;
Ok((address, pid))
}
}
fn new_listener(address: &Path) -> Result<UnixListener> {
let trim_path = address.strip_prefix("unix:").context("trim path")?;
let file_path = Path::new("/").join(trim_path);
let file_path = file_path.as_path();
if let Some(parent_dir) = file_path.parent() {
fs::create_dir_all(parent_dir).context("create parent dir")?;
}
UnixListener::bind(file_path).context("bind address")
}
#[cfg(test)]
mod tests {
use std::path::Path;
use serial_test::serial;
use tests_utils::gen_id;
use super::*;
use crate::Args;
#[test]
#[serial]
fn test_new_command() {
let dir = tempfile::tempdir().unwrap();
let bundle_path = dir.path();
std::env::set_current_dir(bundle_path).unwrap();
let args = Args {
id: "sandbox1".into(),
namespace: "ns".into(),
address: "address".into(),
publish_binary: "containerd".into(),
socket: "socket".into(),
bundle: bundle_path.to_str().unwrap().into(),
debug: false,
};
let mut executor = ShimExecutor::new(args);
let cmd = executor.new_command().unwrap();
assert_eq!(cmd.get_args().len(), 8);
assert_eq!(cmd.get_envs().len(), 1);
assert_eq!(
cmd.get_current_dir().unwrap(),
executor.get_bundle_path().unwrap()
);
executor.args.debug = true;
let cmd = executor.new_command().unwrap();
assert_eq!(cmd.get_args().len(), 9);
assert_eq!(cmd.get_envs().len(), 1);
assert_eq!(
cmd.get_current_dir().unwrap(),
executor.get_bundle_path().unwrap()
);
}
#[test]
#[serial]
fn test_get_info_from_sandbox() {
let dir = tempfile::tempdir().unwrap();
let sandbox_id = gen_id(16);
let bundle_path = &dir.path().join(&sandbox_id);
std::fs::create_dir(bundle_path).unwrap();
std::env::set_current_dir(bundle_path).unwrap();
let args = Args {
id: sandbox_id.to_owned(),
namespace: "ns1".into(),
address: "containerd_socket".into(),
publish_binary: "containerd".into(),
socket: "socket".into(),
bundle: bundle_path.to_str().unwrap().into(),
debug: false,
};
let executor = ShimExecutor::new(args);
let addr = executor.socket_address(&executor.args.id).unwrap();
executor.write_address(&addr).unwrap();
executor.write_pid_file(1267).unwrap();
let container_id = gen_id(16);
let bundle_path2 = &dir.path().join(&container_id);
std::fs::create_dir(bundle_path2).unwrap();
std::env::set_current_dir(bundle_path2).unwrap();
let args = Args {
id: container_id,
namespace: "ns1".into(),
address: "containerd_socket".into(),
publish_binary: "containerd".into(),
socket: "socket".into(),
bundle: bundle_path2.to_str().unwrap().into(),
debug: false,
};
let executor2 = ShimExecutor::new(args);
let (address, pid) = executor2.get_shim_info_from_sandbox(&sandbox_id).unwrap();
assert_eq!(pid, 1267);
assert_eq!(&address, &addr);
}
#[test]
#[serial]
fn test_new_listener() {
let path = "/tmp/aaabbbccc";
let uds_path = format!("unix://{}", path);
std::fs::remove_file(path).ok();
let _ = new_listener(Path::new(&uds_path)).unwrap();
std::fs::remove_file(path).ok();
}
}