Merge pull request #923 from liubin/fix/simplify-codes

agent: simplify codes
This commit is contained in:
Peng Tao 2020-10-13 09:54:46 +08:00 committed by GitHub
commit 16a6427ca9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 192 additions and 571 deletions

View File

@ -3,7 +3,6 @@
// SPDX-License-Identifier: Apache-2.0
//
// use crate::configs::{FreezerState, Config};
use anyhow::{anyhow, Result};
use oci::LinuxResources;
use protocols::agent::CgroupStats;

View File

@ -366,128 +366,3 @@ impl IfPrioMap {
format!("{} {}", self.interface, self.priority)
}
}
/*
impl Config {
fn new(opts: &CreateOpts) -> Result<Self> {
if opts.spec.is_none() {
return Err(ErrorKind::ErrorCode("invalid createopts!".into()));
}
let root = unistd::getcwd().chain_err(|| "cannot getwd")?;
let root = root.as_path().canonicalize().chain_err(||
"cannot resolve root into absolute path")?;
let mut root = root.into();
let cwd = root.clone();
let spec = opts.spec.as_ref().unwrap();
if spec.root.is_none() {
return Err(ErrorKind::ErrorCode("no root".into()));
}
let rootfs = PathBuf::from(&spec.root.as_ref().unwrap().path);
if rootfs.is_relative() {
root = format!("{}/{}", root, rootfs.into());
}
// handle annotations
let mut label = spec.annotations
.iter()
.map(|(key, value)| format!("{}={}", key, value)).collect();
label.push(format!("bundle={}", cwd));
let mut config = Config {
rootfs: root,
no_pivot_root: opts.no_pivot_root,
readonlyfs: spec.root.as_ref().unwrap().readonly,
hostname: spec.hostname.clone(),
labels: label,
no_new_keyring: opts.no_new_keyring,
rootless_euid: opts.rootless_euid,
rootless_cgroups: opts.rootless_cgroups,
};
config.mounts = Vec::new();
for m in &spec.mounts {
config.mounts.push(Mount::new(&cwd, &m)?);
}
config.devices = create_devices(&spec)?;
config.cgroups = Cgroups::new(&opts)?;
if spec.linux.as_ref().is_none() {
return Err(ErrorKind::ErrorCode("no linux configuration".into()));
}
let linux = spec.linux.as_ref().unwrap();
let propagation = MOUNTPROPAGATIONMAPPING.get(linux.rootfs_propagation);
if propagation.is_none() {
Err(ErrorKind::ErrorCode("rootfs propagation not support".into()));
}
config.root_propagation = propagation.unwrap();
if config.no_pivot_root && (config.root_propagation & MSFlags::MSPRIVATE != 0) {
return Err(ErrorKind::ErrorCode("[r]private is not safe without pivot root".into()));
}
// handle namespaces
let m: HashMap<String, String> = HashMap::new();
for ns in &linux.namespaces {
if NAMESPACEMAPPING.get(&ns.r#type.as_str()).is_none() {
return Err(ErrorKind::ErrorCode("namespace don't exist".into()));
}
if m.get(&ns.r#type).is_some() {
return Err(ErrorKind::ErrorCode(format!("duplicate ns {}", ns.r#type)));
}
m.insert(ns.r#type, ns.path);
}
if m.contains_key(oci::NETWORKNAMESPACE) {
let path = m.get(oci::NETWORKNAMESPACE).unwrap();
if path == "" {
config.networks = vec![Network {
r#type: "loopback",
}];
}
}
if m.contains_key(oci::USERNAMESPACE) {
setup_user_namespace(&spec, &mut config)?;
}
config.namespaces = m.iter().map(|(key, value)| Namespace {
r#type: key,
path: value,
}).collect();
config.mask_paths = linux.mask_paths;
config.readonly_path = linux.readonly_path;
config.mount_label = linux.mount_label;
config.sysctl = linux.sysctl;
config.seccomp = None;
config.intelrdt = None;
if spec.process.is_some() {
let process = spec.process.as_ref().unwrap();
config.oom_score_adj = process.oom_score_adj;
config.process_label = process.selinux_label.clone();
if process.capabilities.as_ref().is_some() {
let cap = process.capabilities.as_ref().unwrap();
config.capabilities = Some(Capabilities {
..cap
})
}
}
config.hooks = None;
config.version = spec.version;
Ok(config)
}
}
impl Mount {
fn new(cwd: &str, m: &oci::Mount) -> Result<Self> {
}
}
*/

View File

@ -3,35 +3,32 @@
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{anyhow, bail, Context, Result};
use dirs;
use lazy_static;
use libc::pid_t;
use oci::{Hook, Linux, LinuxNamespace, LinuxResources, POSIXRlimit, Spec};
use oci::{LinuxDevice, LinuxIDMapping};
use serde_json;
use std::clone::Clone;
use std::ffi::{CStr, CString};
use std::fmt;
use std::fmt::Display;
use std::fs;
use std::os::unix::io::RawFd;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
// use crate::sync::Cond;
use anyhow::{anyhow, bail, Context, Result};
use libc::pid_t;
use oci::{LinuxDevice, LinuxIDMapping};
use std::clone::Clone;
use std::fmt::Display;
use std::process::Command;
use std::time::SystemTime;
use cgroups::freezer::FreezerState;
use crate::process::Process;
// use crate::intelrdt::Manager as RdtManager;
use crate::log_child;
use crate::specconv::CreateOpts;
use crate::sync::*;
// use crate::stats::Stats;
use crate::capabilities::{self, CAPSMAP};
use crate::cgroups::fs::Manager as FsManager;
use crate::cgroups::Manager;
use crate::log_child;
use crate::process::Process;
use crate::specconv::CreateOpts;
use crate::sync::*;
use crate::{mount, validator};
use protocols::agent::StatsContainerResponse;
@ -225,11 +222,6 @@ pub struct BaseState {
init_process_pid: i32,
#[serde(default)]
init_process_start: u64,
/*
#[serde(default)]
created: SystemTime,
config: Config,
*/
}
pub trait BaseContainer {
@ -291,12 +283,8 @@ pub struct SyncPC {
}
pub trait Container: BaseContainer {
// fn checkpoint(&self, opts: &CriuOpts) -> Result<()>;
// fn restore(&self, p: &Process, opts: &CriuOpts) -> Result<()>;
fn pause(&mut self) -> Result<()>;
fn resume(&mut self) -> Result<()>;
// fn notify_oom(&self) -> Result<(Sender, Receiver)>;
// fn notify_memory_pressure(&self, lvl: PressureLevel) -> Result<(Sender, Receiver)>;
}
impl Container for LinuxContainer {
@ -399,7 +387,7 @@ fn do_init_child(cwfd: RawFd) -> Result<()> {
let linux = spec.linux.as_ref().unwrap();
// get namespace vector to join/new
let nses = get_namespaces(&linux)?;
let nses = get_namespaces(&linux);
let mut userns = false;
let mut to_new = CloneFlags::empty();
@ -627,7 +615,7 @@ fn do_init_child(cwfd: RawFd) -> Result<()> {
fifofd = std::env::var(FIFO_FD)?.parse::<i32>().unwrap();
}
//cleanup the env inherited from parent
// cleanup the env inherited from parent
for (key, _) in env::vars() {
env::remove_var(key);
}
@ -636,7 +624,6 @@ fn do_init_child(cwfd: RawFd) -> Result<()> {
for e in env.iter() {
let v: Vec<&str> = e.splitn(2, "=").collect();
if v.len() != 2 {
//info!(logger, "incorrect env config!");
continue;
}
env::set_var(v[0], v[1]);
@ -780,7 +767,6 @@ impl BaseContainer for LinuxContainer {
return Err(anyhow!("exec fifo exists"));
}
unistd::mkfifo(fifo_file.as_str(), Mode::from_bits(0o622).unwrap())?;
// defer!(fs::remove_file(&fifo_file)?);
fifofd = fcntl::open(
fifo_file.as_str(),
@ -1089,8 +1075,6 @@ fn do_exec(args: &[String]) -> ! {
let a: Vec<&CStr> = sa.iter().map(|s| s.as_c_str()).collect();
if let Err(e) = unistd::execvp(p.as_c_str(), a.as_slice()) {
// info!(logger, "execve failed!!!");
// info!(logger, "binary: {:?}, args: {:?}, envs: {:?}", p, a, env);
match e {
nix::Error::Sys(errno) => {
std::process::exit(errno as i32);
@ -1156,24 +1140,21 @@ fn get_pid_namespace(logger: &Logger, linux: &Linux) -> Result<Option<RawFd>> {
}
fn is_userns_enabled(linux: &Linux) -> bool {
for ns in &linux.namespaces {
if ns.r#type == "user" && ns.path == "" {
return true;
}
}
false
linux
.namespaces
.iter()
.any(|ns| ns.r#type == "user" && ns.path == "")
}
fn get_namespaces(linux: &Linux) -> Result<Vec<LinuxNamespace>> {
let mut ns: Vec<LinuxNamespace> = Vec::new();
for i in &linux.namespaces {
ns.push(LinuxNamespace {
r#type: i.r#type.clone(),
path: i.path.clone(),
});
}
Ok(ns)
fn get_namespaces(linux: &Linux) -> Vec<LinuxNamespace> {
linux
.namespaces
.iter()
.map(|ns| LinuxNamespace {
r#type: ns.r#type.clone(),
path: ns.path.clone(),
})
.collect()
}
fn join_namespaces(
@ -1198,7 +1179,6 @@ fn join_namespaces(
info!(logger, "wait child received oci spec");
// child.try_wait()?;
read_sync(prfd)?;
info!(logger, "send oci process from parent to child");
@ -1211,7 +1191,7 @@ fn join_namespaces(
let cm_str = serde_json::to_string(cm)?;
write_sync(pwfd, SYNC_DATA, cm_str.as_str())?;
//wait child setup user namespace
// wait child setup user namespace
info!(logger, "wait child setup user namespace");
read_sync(prfd)?;
@ -1270,7 +1250,7 @@ fn join_namespaces(
read_sync(prfd)?;
info!(logger, "get ready to run poststart hook!");
//run poststart hook
// run poststart hook
if spec.hooks.is_some() {
info!(logger, "poststart hook");
let hooks = spec.hooks.as_ref().unwrap();
@ -1287,15 +1267,12 @@ fn join_namespaces(
}
fn write_mappings(logger: &Logger, path: &str, maps: &[LinuxIDMapping]) -> Result<()> {
let mut data = String::new();
for m in maps {
if m.size == 0 {
continue;
}
let val = format!("{} {} {}\n", m.container_id, m.host_id, m.size);
data = data + &val;
}
let data = maps
.iter()
.filter(|m| m.size != 0)
.map(|m| format!("{} {} {}\n", m.container_id, m.host_id, m.size))
.collect::<Vec<_>>()
.join("");
info!(logger, "mapping: {}", data);
if !data.is_empty() {
@ -1508,7 +1485,6 @@ fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> {
let args = h.args.clone();
let envs = h.env.clone();
let state = serde_json::to_string(st)?;
// state.push_str("\n");
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
defer!({
@ -1528,9 +1504,6 @@ fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> {
info!(logger, "hook child: {} status: {}", child, status);
// let _ = wait::waitpid(_ch,
// Some(WaitPidFlag::WEXITED | WaitPidFlag::__WALL));
if status != 0 {
if status == -libc::ETIMEDOUT {
return Err(anyhow!(nix::Error::from_errno(Errno::ETIMEDOUT)));
@ -1571,7 +1544,7 @@ fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> {
.spawn()
.unwrap();
//send out our pid
// send out our pid
tx.send(child.id() as libc::pid_t).unwrap();
info!(logger, "hook grand: {}", child.id());
@ -1590,7 +1563,7 @@ fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> {
.unwrap()
.read_to_string(&mut out)
.unwrap();
info!(logger, "{}", out.as_str());
info!(logger, "child stdout: {}", out.as_str());
match child.wait() {
Ok(exit) => {
let code: i32 = if exit.success() {
@ -1660,8 +1633,6 @@ fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> {
SYNC_DATA,
std::str::from_utf8(&status.to_be_bytes()).unwrap_or_default(),
);
// let _ = wait::waitpid(Pid::from_raw(pid),
// Some(WaitPidFlag::WEXITED | WaitPidFlag::__WALL));
std::process::exit(0);
}
}

View File

@ -554,11 +554,6 @@ pub fn grpc_to_oci(grpc: &grpcSpec) -> ociSpec {
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
#[allow(unused_macros)]
#[macro_export]
macro_rules! skip_if_not_root {

View File

@ -912,8 +912,6 @@ fn mask_path(path: &str) -> Result<()> {
return Err(nix::Error::Sys(Errno::EINVAL).into());
}
//info!("{}", path);
match mount(
Some("/dev/null"),
path,
@ -929,7 +927,6 @@ fn mask_path(path: &str) -> Result<()> {
}
Err(e) => {
//info!("{}: {}", path, e.as_errno().unwrap().desc());
return Err(e.into());
}
@ -944,8 +941,6 @@ fn readonly_path(path: &str) -> Result<()> {
return Err(nix::Error::Sys(Errno::EINVAL).into());
}
//info!("{}", path);
match mount(
Some(&path[1..]),
path,
@ -963,7 +958,6 @@ fn readonly_path(path: &str) -> Result<()> {
}
Err(e) => {
//info!("{}: {}", path, e.as_errno().unwrap().desc());
return Err(e.into());
}

View File

@ -3,16 +3,11 @@
// SPDX-License-Identifier: Apache-2.0
//
// use std::process::{Stdio, Command, ExitStatus};
use libc::pid_t;
use std::fs::File;
use std::os::unix::io::RawFd;
use std::sync::mpsc::Sender;
// use crate::configs::{Capabilities, Rlimit};
// use crate::cgroups::Manager as CgroupManager;
// use crate::intelrdt::Manager as RdtManager;
use nix::fcntl::{fcntl, FcntlArg, OFlag};
use nix::sys::signal::{self, Signal};
use nix::sys::wait::{self, WaitStatus};
@ -31,8 +26,6 @@ pub struct Process {
pub exit_pipe_r: Option<RawFd>,
pub exit_pipe_w: Option<RawFd>,
pub extra_files: Vec<File>,
// pub caps: Capabilities,
// pub rlimits: Vec<Rlimit>,
pub term_master: Option<RawFd>,
pub tty: bool,
pub parent_stdin: Option<RawFd>,

View File

@ -4,8 +4,6 @@
//
use oci::Spec;
// use crate::configs::namespaces;
// use crate::configs::device::Device;
#[derive(Debug)]
pub struct CreateOpts {
@ -17,143 +15,3 @@ pub struct CreateOpts {
pub rootless_euid: bool,
pub rootless_cgroup: bool,
}
/*
const WILDCARD: i32 = -1;
lazy_static! {
static ref NAEMSPACEMAPPING: HashMap<&'static str, &'static str> = {
let mut m = HashMap::new();
m.insert(oci::PIDNAMESPACE, namespaces::NEWPID);
m.insert(oci::NETWORKNAMESPACE, namespaces::NEWNET);
m.insert(oci::UTSNAMESPACE, namespaces::NEWUTS);
m.insert(oci::MOUNTNAMESPACE, namespaces::NEWNS);
m.insert(oci::IPCNAMESPACE, namespaces::NEWIPC);
m.insert(oci::USERNAMESPACE, namespaces::NEWUSER);
m.insert(oci::CGROUPNAMESPACE, namespaces::NEWCGROUP);
m
};
static ref MOUNTPROPAGATIONMAPPING: HashMap<&'static str, MsFlags> = {
let mut m = HashMap::new();
m.insert("rprivate", MsFlags::MS_PRIVATE | MsFlags::MS_REC);
m.insert("private", MsFlags::MS_PRIVATE);
m.insert("rslave", MsFlags::MS_SLAVE | MsFlags::MS_REC);
m.insert("slave", MsFlags::MS_SLAVE);
m.insert("rshared", MsFlags::MS_SHARED | MsFlags::MS_REC);
m.insert("shared", MsFlags::MS_SHARED);
m.insert("runbindable", MsFlags::MS_UNBINDABLE | MsFlags::MS_REC);
m.insert("unbindable", MsFlags::MS_UNBINDABLE);
m
};
static ref ALLOWED_DEVICES: Vec<Device> = {
let mut m = Vec::new();
m.push(Device {
r#type: 'c',
major: WILDCARD,
minor: WILDCARD,
permissions: "m",
allow: true,
});
m.push(Device {
r#type: 'b',
major: WILDCARD,
minor: WILDCARD,
permissions: "m",
allow: true,
});
m.push(Device {
r#type: 'c',
path: "/dev/null".to_string(),
major: 1,
minor: 3,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from("/dev/random"),
major: 1,
minor: 8,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from("/dev/full"),
major: 1,
minor: 7,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from("/dev/tty"),
major: 5,
minor: 0,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from("/dev/zero"),
major: 1,
minor: 5,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from("/dev/urandom"),
major: 1,
minor: 9,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from("/dev/console"),
major: 5,
minor: 1,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from(""),
major: 136,
minor: WILDCARD,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from(""),
major: 5,
minor: 2,
permissions: "rwm",
allow: true,
});
m.push(Device {
r#type: 'c',
path: String::from(""),
major: 10,
minor: 200,
permissions: "rwm",
allow: true,
});
m
};
}
*/

View File

@ -125,7 +125,7 @@ lazy_static! {
// type of storage driver.
type StorageHandler = fn(&Logger, &Storage, Arc<Mutex<Sandbox>>) -> Result<String>;
// StorageHandlerList lists the supported drivers.
// STORAGEHANDLERLIST lists the supported drivers.
#[cfg_attr(rustfmt, rustfmt_skip)]
lazy_static! {
pub static ref STORAGEHANDLERLIST: HashMap<&'static str, StorageHandler> = {
@ -510,7 +510,7 @@ pub fn get_mount_fs_type(mount_point: &str) -> Result<String> {
get_mount_fs_type_from_file(PROC_MOUNTSTATS, mount_point)
}
// get_mount_fs_type returns the FS type corresponding to the passed mount point and
// get_mount_fs_type_from_file returns the FS type corresponding to the passed mount point and
// any error ecountered.
pub fn get_mount_fs_type_from_file(mount_file: &str, mount_point: &str) -> Result<String> {
if mount_point == "" {
@ -643,7 +643,7 @@ pub fn cgroups_mount(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<
// Enable memory hierarchical account.
// For more information see https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
online_device("/sys/fs/cgroup/memory//memory.use_hierarchy")?;
online_device("/sys/fs/cgroup/memory/memory.use_hierarchy")?;
Ok(())
}
@ -654,8 +654,8 @@ pub fn remove_mounts(mounts: &Vec<String>) -> Result<()> {
Ok(())
}
// ensureDestinationExists will recursively create a given mountpoint. If directories
// are created, their permissions are initialized to mountPerm
// ensure_destination_exists will recursively create a given mountpoint. If directories
// are created, their permissions are initialized to mountPerm(0755)
fn ensure_destination_exists(destination: &str, fs_type: &str) -> Result<()> {
let d = Path::new(destination);
if !d.exists() {

View File

@ -75,7 +75,7 @@ impl Namespace {
self
}
// setup_persistent_ns creates persistent namespace without switching to it.
// setup creates persistent namespace without switching to it.
// Note, pid namespaces cannot be persisted.
pub fn setup(mut self) -> Result<Self, String> {
if let Err(err) = fs::create_dir_all(&self.persistent_ns_dir) {

View File

@ -76,7 +76,6 @@ macro_rules! sl {
#[derive(Clone)]
pub struct agentService {
sandbox: Arc<Mutex<Sandbox>>,
test: u32,
}
impl agentService {
@ -101,7 +100,6 @@ impl agentService {
// re-scan PCI bus
// looking for hidden devices
rescan_pci_bus().context("Could not rescan PCI bus")?;
// Some devices need some extra processing (the ones invoked with
@ -183,12 +181,9 @@ impl agentService {
let mut s = sandbox.lock().unwrap();
let sid = s.id.clone();
let ctr: &mut LinuxContainer = match s.get_container(cid.as_str()) {
Some(cr) => cr,
None => {
return Err(anyhow!(nix::Error::from_errno(Errno::EINVAL)));
}
};
let ctr = s
.get_container(&cid)
.ok_or(anyhow!("Invalid container id"))?;
ctr.exec()?;
@ -208,18 +203,7 @@ impl agentService {
let cid = req.container_id.clone();
let mut cmounts: Vec<String> = vec![];
if req.timeout == 0 {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
let ctr: &mut LinuxContainer = match sandbox.get_container(cid.as_str()) {
Some(cr) => cr,
None => {
return Err(anyhow!(nix::Error::from_errno(Errno::EINVAL)));
}
};
ctr.destroy()?;
let mut remove_container_resources = |sandbox: &mut Sandbox| -> Result<()> {
// Find the sandbox storage used by this container
let mounts = sandbox.container_mounts.get(&cid);
if mounts.is_some() {
@ -240,6 +224,19 @@ impl agentService {
sandbox.container_mounts.remove(cid.as_str());
sandbox.containers.remove(cid.as_str());
Ok(())
};
if req.timeout == 0 {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
let ctr = sandbox
.get_container(&cid)
.ok_or(anyhow!("Invalid container id"))?;
ctr.destroy()?;
remove_container_resources(&mut sandbox)?;
return Ok(());
}
@ -251,15 +248,14 @@ impl agentService {
let handle = thread::spawn(move || {
let mut sandbox = s.lock().unwrap();
let ctr: &mut LinuxContainer = match sandbox.get_container(cid2.as_str()) {
Some(cr) => cr,
None => {
return;
}
};
let _ctr = sandbox
.get_container(&cid2)
.ok_or(anyhow!("Invalid container id"))
.and_then(|ctr| {
ctr.destroy().unwrap();
tx.send(1).unwrap();
Ok(ctr)
});
});
if let Err(_) = rx.recv_timeout(Duration::from_secs(req.timeout as u64)) {
@ -275,26 +271,7 @@ impl agentService {
let s = self.sandbox.clone();
let mut sandbox = s.lock().unwrap();
// Find the sandbox storage used by this container
let mounts = sandbox.container_mounts.get(&cid);
if mounts.is_some() {
let mounts = mounts.unwrap();
remove_mounts(&mounts)?;
for m in mounts.iter() {
if sandbox.storages.get(m).is_some() {
cmounts.push(m.to_string());
}
}
}
for m in cmounts.iter() {
sandbox.unset_and_remove_sandbox_storage(m)?;
}
sandbox.container_mounts.remove(&cid);
sandbox.containers.remove(cid.as_str());
remove_container_resources(&mut sandbox)?;
Ok(())
}
@ -308,7 +285,6 @@ impl agentService {
let s = self.sandbox.clone();
let mut sandbox = s.lock().unwrap();
// ignore string_user, not sure what it is
let process = if req.process.is_some() {
req.process.as_ref().unwrap()
} else {
@ -319,12 +295,9 @@ impl agentService {
let ocip = rustjail::process_grpc_to_oci(process);
let p = Process::new(&sl!(), &ocip, exec_id.as_str(), false, pipe_size)?;
let ctr = match sandbox.get_container(cid.as_str()) {
Some(v) => v,
None => {
return Err(anyhow!(nix::Error::from_errno(nix::errno::Errno::EINVAL)));
}
};
let ctr = sandbox
.get_container(&cid)
.ok_or(anyhow!("Invalid container id"))?;
ctr.run(p)?;
@ -404,12 +377,9 @@ impl agentService {
}
let mut sandbox = s.lock().unwrap();
let ctr: &mut LinuxContainer = match sandbox.get_container(cid.as_str()) {
Some(cr) => cr,
None => {
return Err(anyhow!(nix::Error::from_errno(Errno::EINVAL)));
}
};
let ctr = sandbox
.get_container(&cid)
.ok_or(anyhow!("Invalid container id"))?;
let mut p = match ctr.processes.get_mut(&pid) {
Some(p) => p,
@ -570,10 +540,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
ttrpc::Code::INTERNAL,
e.to_string(),
))),
Ok(_) => {
info!(sl!(), "exec process!\n");
Ok(Empty::new())
}
Ok(_) => Ok(Empty::new()),
}
}
@ -590,6 +557,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(_) => Ok(Empty::new()),
}
}
fn exec_process(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -603,6 +571,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(_) => Ok(Empty::new()),
}
}
fn signal_process(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -616,6 +585,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(_) => Ok(Empty::new()),
}
}
fn wait_process(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -629,6 +599,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) => Ok(resp),
}
}
fn list_processes(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -642,15 +613,12 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
let ctr: &mut LinuxContainer = match sandbox.get_container(cid.as_str()) {
Some(cr) => cr,
None => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let ctr = sandbox
.get_container(&cid)
.ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)));
}
};
)))?;
let pids = ctr.processes().unwrap();
@ -683,15 +651,10 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let out: String = String::from_utf8(output.stdout).unwrap();
let mut lines: Vec<String> = out.split('\n').map(|v| v.to_string()).collect();
let predicate = |v| {
if v == "PID" {
return true;
} else {
return false;
}
};
let pid_index = lines[0].split_whitespace().position(predicate).unwrap();
let pid_index = lines[0]
.split_whitespace()
.position(|v| v == "PID")
.unwrap();
let mut result = String::new();
result.push_str(lines[0].as_str());
@ -720,6 +683,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
resp.process_list = Vec::from(result);
Ok(resp)
}
fn update_container(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -731,15 +695,12 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
let ctr: &mut LinuxContainer = match sandbox.get_container(cid.as_str()) {
Some(cr) => cr,
None => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
let ctr = sandbox
.get_container(&cid)
.ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)));
}
};
)))?;
let resp = Empty::new();
@ -759,6 +720,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp)
}
fn stats_container(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -768,15 +730,12 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
let ctr: &mut LinuxContainer = match sandbox.get_container(cid.as_str()) {
Some(cr) => cr,
None => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
let ctr = sandbox
.get_container(&cid)
.ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid container id".to_string(),
)));
}
};
)))?;
match ctr.stats() {
Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
@ -795,22 +754,19 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
if let Some(ctr) = sandbox.get_container(cid) {
match ctr.pause() {
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)))
}
Ok(_) => return Ok(Empty::new()),
}
};
Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let ctr = sandbox
.get_container(&cid)
.ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid argument".to_string(),
)))
"invalid container id".to_string(),
)))?;
ctr.pause().map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string()))
})?;
Ok(Empty::new())
}
fn resume_container(
@ -821,22 +777,19 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
if let Some(ctr) = sandbox.get_container(cid) {
match ctr.resume() {
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)))
}
Ok(_) => return Ok(Empty::new()),
}
};
Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let ctr = sandbox
.get_container(&cid)
.ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid argument: ".to_string(),
)))
"invalid container id".to_string(),
)))?;
ctr.resume().map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string()))
})?;
Ok(Empty::new())
}
fn write_stdin(
@ -852,6 +805,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) => Ok(resp),
}
}
fn read_stdout(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -865,6 +819,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) => Ok(resp),
}
}
fn read_stderr(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -878,6 +833,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) => Ok(resp),
}
}
fn close_stdin(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -973,18 +929,18 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let rtnl = sandbox.rtnl.as_mut().unwrap();
let iface = match rtnl.update_interface(interface.as_ref().unwrap()) {
Ok(v) => v,
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let iface = rtnl
.update_interface(interface.as_ref().unwrap())
.map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
format!("update interface: {:?}", e),
)));
}
};
))
})?;
Ok(iface)
}
fn update_routes(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1001,16 +957,15 @@ impl protocols::agent_ttrpc::AgentService for agentService {
}
let rtnl = sandbox.rtnl.as_mut().unwrap();
// get current routes to return when error out
let crs = match rtnl.list_routes() {
Ok(routes) => routes,
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let crs = rtnl.list_routes().map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
format!("update routes: {:?}", e),
)));
}
};
))
})?;
let v = match rtnl.update_routes(rs.as_ref()) {
Ok(value) => value,
Err(_) => crs,
@ -1020,6 +975,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(routes)
}
fn list_interfaces(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1034,20 +990,18 @@ impl protocols::agent_ttrpc::AgentService for agentService {
}
let rtnl = sandbox.rtnl.as_mut().unwrap();
let v = match rtnl.list_interfaces() {
Ok(value) => value,
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let v = rtnl.list_interfaces().map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
format!("list interface: {:?}", e),
)));
}
};
))
})?;
interface.set_Interfaces(RepeatedField::from_vec(v));
Ok(interface)
}
fn list_routes(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1063,28 +1017,27 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let rtnl = sandbox.rtnl.as_mut().unwrap();
let v = match rtnl.list_routes() {
Ok(value) => value,
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
let v = rtnl.list_routes().map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
format!("list routes: {:?}", e),
)));
}
};
))
})?;
routes.set_Routes(RepeatedField::from_vec(v));
Ok(routes)
}
fn start_tracing(
&self,
_ctx: &ttrpc::TtrpcContext,
req: protocols::agent::StartTracingRequest,
) -> ttrpc::Result<Empty> {
info!(sl!(), "start_tracing {:?} self.test={}", req, self.test);
info!(sl!(), "start_tracing {:?}", req);
Ok(Empty::new())
}
fn stop_tracing(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1122,26 +1075,14 @@ impl protocols::agent_ttrpc::AgentService for agentService {
}
for m in req.kernel_modules.iter() {
match load_kernel_module(m) {
Ok(_) => (),
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)))
}
}
let _ = load_kernel_module(m).map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string()))
})?;
}
match s.setup_shared_namespaces() {
Ok(_) => (),
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)))
}
}
s.setup_shared_namespaces().map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string()))
})?;
}
match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone()) {
@ -1195,6 +1136,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new())
}
fn add_arp_neighbors(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1211,15 +1153,13 @@ impl protocols::agent_ttrpc::AgentService for agentService {
let rtnl = sandbox.rtnl.as_mut().unwrap();
if let Err(e) = rtnl.add_arp_neighbors(neighs.as_ref()) {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)));
}
rtnl.add_arp_neighbors(neighs.as_ref()).map_err(|e| {
ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string()))
})?;
Ok(Empty::new())
}
fn online_cpu_mem(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1237,6 +1177,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new())
}
fn reseed_random_dev(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1251,6 +1192,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new())
}
fn get_guest_details(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1279,6 +1221,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp)
}
fn mem_hotplug_by_probe(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1293,6 +1236,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new())
}
fn set_guest_date_time(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1307,6 +1251,7 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new())
}
fn copy_file(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1382,6 +1327,7 @@ impl protocols::health_ttrpc::Health for healthService {
Ok(resp)
}
fn version(
&self,
_ctx: &ttrpc::TtrpcContext,
@ -1500,33 +1446,23 @@ fn find_process<'a>(
eid: &'a str,
init: bool,
) -> Result<&'a mut Process> {
let ctr = match sandbox.get_container(cid) {
Some(v) => v,
None => return Err(anyhow!("Invalid container id")),
};
let ctr = sandbox
.get_container(cid)
.ok_or(anyhow!("Invalid container id"))?;
if init || eid == "" {
let p = match ctr.processes.get_mut(&ctr.init_process_pid) {
Some(v) => v,
None => return Err(anyhow!("cannot find init process!")),
};
return Ok(p);
return ctr
.processes
.get_mut(&ctr.init_process_pid)
.ok_or(anyhow!("cannot find init process!"));
}
let p = match ctr.get_process(eid) {
Ok(v) => v,
Err(_) => return Err(anyhow!("Invalid exec id")),
};
Ok(p)
ctr.get_process(eid).map_err(|_| anyhow!("Invalid exec id"))
}
pub fn start(s: Arc<Mutex<Sandbox>>, server_address: &str) -> ttrpc::Server {
let agent_service = Box::new(agentService {
sandbox: s,
test: 1,
}) as Box<dyn protocols::agent_ttrpc::AgentService + Send + Sync>;
let agent_service = Box::new(agentService { sandbox: s })
as Box<dyn protocols::agent_ttrpc::AgentService + Send + Sync>;
let agent_worker = Arc::new(agent_service);
@ -1565,10 +1501,10 @@ fn update_container_namespaces(
spec: &mut Spec,
sandbox_pidns: bool,
) -> Result<()> {
let linux = match spec.linux.as_mut() {
None => return Err(anyhow!("Spec didn't container linux field")),
Some(l) => l,
};
let linux = spec
.linux
.as_mut()
.ok_or(anyhow!("Spec didn't container linux field"))?;
let namespaces = linux.namespaces.as_mut_slice();
for namespace in namespaces.iter_mut() {