agent: add cgroup v2 support

This PR add basic cgroup v2 support for agent.

Fixes: #146, #357

Signed-off-by: bin liu <bin@hyper.sh>
This commit is contained in:
bin liu 2020-08-24 19:44:16 +08:00
parent 63138a4f28
commit 15065e4472
15 changed files with 1454 additions and 1338 deletions

29
src/agent/Cargo.lock generated
View File

@ -97,6 +97,17 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cgroups"
version = "0.1.1-alpha.0"
source = "git+https://github.com/kata-containers/cgroups-rs?tag=0.1.1#3852d7c1805499cd6a0e37ec400d81a7085d91a7"
dependencies = [
"libc",
"log",
"nix 0.18.0",
"regex",
]
[[package]]
name = "chrono"
version = "0.4.11"
@ -240,6 +251,7 @@ name = "kata-agent"
version = "0.1.0"
dependencies = [
"anyhow",
"cgroups",
"lazy_static",
"libc",
"logging",
@ -271,9 +283,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.70"
version = "0.2.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f"
checksum = "f2f96b10ec2560088a8e76961b00d47107b3a625fecb76dedb29ee7ccbf98235"
[[package]]
name = "libflate"
@ -363,6 +375,18 @@ dependencies = [
"void",
]
[[package]]
name = "nix"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83450fe6a6142ddd95fb064b746083fc4ef1705fe81f64a64e1d4b39f54a1055"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
]
[[package]]
name = "num-integer"
version = "0.1.42"
@ -619,6 +643,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"caps",
"cgroups",
"dirs",
"lazy_static",
"libc",

View File

@ -32,6 +32,7 @@ tempfile = "3.1.0"
prometheus = { version = "0.9.0", features = ["process"] }
procfs = "0.7.9"
anyhow = "1.0.32"
cgroups = { git = "https://github.com/kata-containers/cgroups-rs", tag = "0.1.1"}
[workspace]
members = [

View File

@ -24,3 +24,4 @@ regex = "1.1"
path-absolutize = "1.2.0"
dirs = "3.0.1"
anyhow = "1.0.32"
cgroups = { git = "https://github.com/kata-containers/cgroups-rs", tag = "0.1.1"}

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 Ant Financial
// Copyright (c) 2019,2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
@ -9,10 +9,11 @@ use oci::LinuxResources;
use protocols::agent::CgroupStats;
use std::collections::HashMap;
pub mod fs;
pub mod systemd;
use cgroups::freezer::FreezerState;
pub type FreezerState = &'static str;
pub mod fs;
pub mod notifier;
pub mod systemd;
pub trait Manager {
fn apply(&self, _pid: i32) -> Result<()> {
@ -23,10 +24,6 @@ pub trait Manager {
Err(anyhow!("not supported!"))
}
fn get_all_pids(&self) -> Result<Vec<i32>> {
Err(anyhow!("not supported!"))
}
fn get_stats(&self) -> Result<CgroupStats> {
Err(anyhow!("not supported!"))
}
@ -39,10 +36,6 @@ pub trait Manager {
Err(anyhow!("not supported!"))
}
fn get_paths(&self) -> Result<HashMap<String, String>> {
Err(anyhow!("not supported!"))
}
fn set(&self, _container: &LinuxResources, _update: bool) -> Result<()> {
Err(anyhow!("not supported!"))
}

View File

@ -0,0 +1,209 @@
// Copyright (c) 2020 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{anyhow, Result};
use eventfd::{eventfd, EfdFlags};
use nix::sys::eventfd;
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify};
use std::fs::{self, File};
use std::io::Read;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{self, Receiver};
use std::thread;
// Convenience macro to obtain the scope logger
macro_rules! sl {
() => {
slog_scope::logger().new(o!("subsystem" => "cgroups_notifier"))
};
}
pub fn notify_oom(cid: &str, cg_dir: String) -> Result<Receiver<String>> {
if cgroups::hierarchies::is_cgroup2_unified_mode() {
return notify_on_oom_v2(cid, cg_dir);
}
notify_on_oom(cid, cg_dir)
}
// get_value_from_cgroup parse cgroup file with `Flat keyed`
// and get the value of `key`.
// Flat keyed file format:
// KEY0 VAL0\n
// KEY1 VAL1\n
fn get_value_from_cgroup(path: &PathBuf, key: &str) -> Result<i64> {
let content = fs::read_to_string(path)?;
info!(
sl!(),
"get_value_from_cgroup file: {:?}, content: {}", &path, &content
);
for line in content.lines() {
let arr: Vec<&str> = line.split(" ").collect();
if arr.len() == 2 && arr[0] == key {
let r = arr[1].parse::<i64>()?;
return Ok(r);
}
}
Ok(0)
}
// notify_on_oom returns channel on which you can expect event about OOM,
// if process died without OOM this channel will be closed.
pub fn notify_on_oom_v2(containere_id: &str, cg_dir: String) -> Result<Receiver<String>> {
register_memory_event_v2(containere_id, cg_dir, "memory.events", "cgroup.events")
}
fn register_memory_event_v2(
containere_id: &str,
cg_dir: String,
memory_event_name: &str,
cgroup_event_name: &str,
) -> Result<Receiver<String>> {
let event_control_path = Path::new(&cg_dir).join(memory_event_name);
let cgroup_event_control_path = Path::new(&cg_dir).join(cgroup_event_name);
info!(
sl!(),
"register_memory_event_v2 event_control_path: {:?}", &event_control_path
);
info!(
sl!(),
"register_memory_event_v2 cgroup_event_control_path: {:?}", &cgroup_event_control_path
);
let fd = Inotify::init(InitFlags::empty()).unwrap();
// watching oom kill
let ev_fd = fd
.add_watch(&event_control_path, AddWatchFlags::IN_MODIFY)
.unwrap();
// Because no `unix.IN_DELETE|unix.IN_DELETE_SELF` event for cgroup file system, so watching all process exited
let cg_fd = fd
.add_watch(&cgroup_event_control_path, AddWatchFlags::IN_MODIFY)
.unwrap();
info!(sl!(), "ev_fd: {:?}", ev_fd);
info!(sl!(), "cg_fd: {:?}", cg_fd);
let (sender, receiver) = mpsc::channel();
let containere_id = containere_id.to_string();
thread::spawn(move || {
loop {
let events = fd.read_events().unwrap();
info!(
sl!(),
"container[{}] get events for container: {:?}", &containere_id, &events
);
for event in events {
if event.mask & AddWatchFlags::IN_MODIFY != AddWatchFlags::IN_MODIFY {
continue;
}
info!(sl!(), "event.wd: {:?}", event.wd);
match event.wd {
ev_fd => {
let oom = get_value_from_cgroup(&event_control_path, "oom_kill");
if oom.unwrap_or(0) > 0 {
sender.send(containere_id.clone()).unwrap();
return;
}
}
cg_fd => {
let pids = get_value_from_cgroup(&cgroup_event_control_path, "populated");
if pids.unwrap_or(-1) == 0 {
return;
}
}
}
}
// When a cgroup is destroyed, an event is sent to eventfd.
// So if the control path is gone, return instead of notifying.
if !Path::new(&event_control_path).exists() {
return;
}
}
});
Ok(receiver)
}
// notify_on_oom returns channel on which you can expect event about OOM,
// if process died without OOM this channel will be closed.
fn notify_on_oom(cid: &str, dir: String) -> Result<Receiver<String>> {
if dir == "" {
return Err(anyhow!("memory controller missing"));
}
register_memory_event(cid, dir, "memory.oom_control", "")
}
// level is one of "low", "medium", or "critical"
fn notify_memory_pressure(cid: &str, dir: String, level: &str) -> Result<Receiver<String>> {
if dir == "" {
return Err(anyhow!("memory controller missing"));
}
if level != "low" && level != "medium" && level != "critical" {
return Err(anyhow!("invalid pressure level {}", level));
}
register_memory_event(cid, dir, "memory.pressure_level", level)
}
fn register_memory_event(
cid: &str,
cg_dir: String,
event_name: &str,
arg: &str,
) -> Result<Receiver<String>> {
let path = Path::new(&cg_dir).join(event_name);
let event_file = File::open(path.clone())?;
let eventfd = eventfd(0, EfdFlags::EFD_CLOEXEC)?;
let event_control_path = Path::new(&cg_dir).join("cgroup.event_control");
let data;
if arg == "" {
data = format!("{} {}", eventfd, event_file.as_raw_fd());
} else {
data = format!("{} {} {}", eventfd, event_file.as_raw_fd(), arg);
}
fs::write(&event_control_path, data)?;
let mut eventfd_file = unsafe { File::from_raw_fd(eventfd) };
let (sender, receiver) = mpsc::channel();
let containere_id = cid.to_string();
thread::spawn(move || {
loop {
let mut buf = [0; 8];
match eventfd_file.read(&mut buf) {
Err(err) => {
warn!(sl!(), "failed to read from eventfd: {:?}", err);
return;
}
Ok(_) => {
let content = fs::read_to_string(path.clone());
info!(
sl!(),
"OOM event for container: {}, content: {:?}", &containere_id, content
);
}
}
// When a cgroup is destroyed, an event is sent to eventfd.
// So if the control path is gone, return instead of notifying.
if !Path::new(&event_control_path).exists() {
return;
}
sender.send(containere_id.clone()).unwrap();
}
});
Ok(receiver)
}

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019 Ant Financial
// Copyright (c) 2019, 2020 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
@ -21,8 +21,8 @@ use std::clone::Clone;
use std::fmt::Display;
use std::process::{Child, Command};
// use crate::configs::namespaces::{NamespaceType};
use crate::cgroups::Manager as CgroupManager;
use cgroups::freezer::FreezerState;
use crate::process::Process;
// use crate::intelrdt::Manager as RdtManager;
use crate::log_child;
@ -31,6 +31,7 @@ use crate::sync::*;
// use crate::stats::Stats;
use crate::capabilities::{self, CAPSMAP};
use crate::cgroups::fs::{self as fscgroup, Manager as FsManager};
use crate::cgroups::Manager;
use crate::{mount, validator};
use protocols::agent::StatsContainerResponse;
@ -242,9 +243,7 @@ pub trait BaseContainer {
// Or use Mutex<xx> as a member of struct, like C?
// a lot of String in the struct might be &str
#[derive(Debug)]
pub struct LinuxContainer
// where T: CgroupManager
{
pub struct LinuxContainer {
pub id: String,
pub root: String,
pub config: Config,
@ -303,7 +302,7 @@ impl Container for LinuxContainer {
self.cgroup_manager
.as_ref()
.unwrap()
.freeze(fscgroup::FROZEN)?;
.freeze(FreezerState::Frozen)?;
self.status.transition(Status::PAUSED);
return Ok(());
@ -321,7 +320,7 @@ impl Container for LinuxContainer {
self.cgroup_manager
.as_ref()
.unwrap()
.freeze(fscgroup::THAWED)?;
.freeze(FreezerState::Thawed)?;
self.status.transition(Status::RUNNING);
return Ok(());
@ -352,6 +351,7 @@ fn do_init_child(cwfd: RawFd) -> Result<()> {
lazy_static::initialize(&CAPSMAP);
let init = std::env::var(INIT)?.eq(format!("{}", true).as_str());
let no_pivot = std::env::var(NO_PIVOT)?.eq(format!("{}", true).as_str());
let crfd = std::env::var(CRFD_FD)?.parse::<i32>().unwrap();
let cfd_log = std::env::var(CLOG_FD)?.parse::<i32>().unwrap();
@ -372,6 +372,7 @@ fn do_init_child(cwfd: RawFd) -> Result<()> {
let buf = read_sync(crfd)?;
let cm_str = std::str::from_utf8(&buf)?;
let cm: FsManager = serde_json::from_str(cm_str)?;
let p = if spec.process.is_some() {
@ -752,8 +753,6 @@ impl BaseContainer for LinuxContainer {
}
info!(logger, "exec fifo opened!");
fscgroup::init_static();
if self.config.spec.is_none() {
return Err(anyhow!("no spec"));
}
@ -1125,7 +1124,7 @@ fn join_namespaces(
p: &Process,
cm: &FsManager,
st: &OCIState,
child: &mut Child,
_child: &mut Child,
pwfd: RawFd,
prfd: RawFd,
) -> Result<()> {
@ -1326,6 +1325,7 @@ impl LinuxContainer {
};
let cgroup_manager = FsManager::new(cpath.as_str())?;
info!(logger, "new cgroup_manager {:?}", &cgroup_manager);
Ok(LinuxContainer {
id: id.clone(),
@ -1349,48 +1349,6 @@ impl LinuxContainer {
fn load<T: Into<String>>(_id: T, _base: T) -> Result<Self> {
Err(anyhow!("not supported"))
}
/*
fn new_parent_process(&self, p: &Process) -> Result<Box<ParentProcess>> {
let (pfd, cfd) = socket::socketpair(AddressFamily::Unix,
SockType::Stream, SockProtocol::Tcp,
SockFlag::SOCK_CLOEXEC)?;
let cmd = Command::new(self.init_path)
.args(self.init_args[1..])
.env("_LIBCONTAINER_INITPIPE", format!("{}",
cfd))
.env("_LIBCONTAINER_STATEDIR", self.root)
.current_dir(Path::new(self.config.rootfs))
.stdin(p.stdin)
.stdout(p.stdout)
.stderr(p.stderr);
if p.console_socket.is_some() {
cmd.env("_LIBCONTAINER_CONSOLE", format!("{}",
unsafe { p.console_socket.unwrap().as_raw_fd() }));
}
if !p.init {
return self.new_setns_process(p, cmd, pfd, cfd);
}
let fifo_file = format!("{}/{}", self.root, EXEC_FIFO_FILENAME);
let fifofd = fcntl::open(fifo_file,
OFlag::O_PATH | OFlag::O_CLOEXEC,
Mode::from_bits(0).unwrap())?;
cmd.env("_LIBCONTAINER_FIFOFD", format!("{}", fifofd));
self.new_init_process(p, cmd, pfd, cfd)
}
fn new_setns_process(&self, p: &Process, cmd: &mut Command, pfd: Rawfd, cfd: Rawfd) -> Result<SetnsProcess> {
}
fn new_init_process(&self, p: &Process, cmd: &mut Command, pfd: Rawfd, cfd: Rawfd) -> Result<InitProcess> {
cmd.env("_LINCONTAINER_INITTYPE", INITSTANDARD);
}
*/
}
// Handle the differing rlimit types for different targets

View File

@ -176,6 +176,38 @@ pub fn init_rootfs(
Ok(())
}
fn mount_cgroups_v2(cfd_log: RawFd, m: &Mount, rootfs: &str, flags: MsFlags) -> Result<()> {
let olddir = unistd::getcwd()?;
unistd::chdir(rootfs)?;
// https://github.com/opencontainers/runc/blob/09ddc63afdde16d5fb859a1d3ab010bd45f08497/libcontainer/rootfs_linux.go#L287
let bm = Mount {
source: "cgroup".to_string(),
r#type: "cgroup2".to_string(),
destination: m.destination.clone(),
options: Vec::new(),
};
let mount_flags: MsFlags = flags;
mount_from(cfd_log, &bm, rootfs, mount_flags, "", "")?;
unistd::chdir(&olddir)?;
if flags.contains(MsFlags::MS_RDONLY) {
let dest = format!("{}{}", rootfs, m.destination.as_str());
mount::mount(
Some(dest.as_str()),
dest.as_str(),
None::<&str>,
flags | MsFlags::MS_BIND | MsFlags::MS_REMOUNT,
None::<&str>,
)?;
}
Ok(())
}
fn mount_cgroups(
cfd_log: RawFd,
m: &Mount,
@ -185,6 +217,9 @@ fn mount_cgroups(
cpath: &HashMap<String, String>,
mounts: &HashMap<String, String>,
) -> Result<()> {
if cgroups::hierarchies::is_cgroup2_unified_mode() {
return mount_cgroups_v2(cfd_log, &m, rootfs, flags);
}
// mount tmpfs
let ctm = Mount {
source: "tmpfs".to_string(),
@ -194,7 +229,6 @@ fn mount_cgroups(
};
let cflags = MsFlags::MS_NOEXEC | MsFlags::MS_NOSUID | MsFlags::MS_NODEV;
// info!(logger, "tmpfs");
mount_from(cfd_log, &ctm, rootfs, cflags, "", "")?;
let olddir = unistd::getcwd()?;
@ -527,29 +561,23 @@ fn mount_from(
if src.is_file() {
let _ = OpenOptions::new().create(true).write(true).open(&dest);
}
src
src.to_str().unwrap().to_string()
} else {
let _ = fs::create_dir_all(&dest);
PathBuf::from(&m.source)
};
// ignore this check since some mount's src didn't been a directory
// such as tmpfs.
/*
match stat::stat(src.to_str().unwrap()) {
Ok(_) => {}
Err(e) => {
info!("{}: {}", src.to_str().unwrap(), e.as_errno().unwrap().desc());
}
if m.r#type.as_str() == "cgroup2" {
"cgroup2".to_string()
} else {
let tmp = PathBuf::from(&m.source);
tmp.to_str().unwrap().to_string()
}
*/
};
match stat::stat(dest.as_str()) {
Ok(_) => {}
Err(e) => {
log_child!(
cfd_log,
"{}: {}",
"dest stat error. {}: {}",
dest.as_str(),
e.as_errno().unwrap().desc()
);
@ -557,7 +585,7 @@ fn mount_from(
}
match mount::mount(
Some(src.to_str().unwrap()),
Some(src.as_str()),
dest.as_str(),
Some(m.r#type.as_str()),
flags,

View File

@ -14,6 +14,7 @@ const HOTPLUG_TIMOUT_OPTION: &str = "agent.hotplug_timeout";
const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport";
const LOG_VPORT_OPTION: &str = "agent.log_vport";
const CONTAINER_PIPE_SIZE_OPTION: &str = "agent.container_pipe_size";
const UNIFIED_CGROUP_HIERARCHY_OPTION: &str = "agent.unified_cgroup_hierarchy";
const DEFAULT_LOG_LEVEL: slog::Level = slog::Level::Info;
const DEFAULT_HOTPLUG_TIMEOUT: time::Duration = time::Duration::from_secs(3);
@ -36,6 +37,7 @@ pub struct agentConfig {
pub log_vport: i32,
pub container_pipe_size: i32,
pub server_addr: String,
pub unified_cgroup_hierarchy: bool,
}
impl agentConfig {
@ -49,6 +51,7 @@ impl agentConfig {
log_vport: 0,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
server_addr: format!("{}:{}", VSOCK_ADDR, VSOCK_PORT),
unified_cgroup_hierarchy: false,
}
}
@ -56,6 +59,7 @@ impl agentConfig {
let cmdline = fs::read_to_string(file)?;
let params: Vec<&str> = cmdline.split_ascii_whitespace().collect();
for param in params.iter() {
// parse cmdline flags
if param.eq(&DEBUG_CONSOLE_FLAG) {
self.debug_console = true;
}
@ -64,6 +68,7 @@ impl agentConfig {
self.dev_mode = true;
}
// parse cmdline options
if param.starts_with(format!("{}=", LOG_LEVEL_OPTION).as_str()) {
let level = get_log_level(param)?;
self.log_level = level;
@ -95,6 +100,11 @@ impl agentConfig {
let container_pipe_size = get_container_pipe_size(param)?;
self.container_pipe_size = container_pipe_size
}
if param.starts_with(format!("{}=", UNIFIED_CGROUP_HIERARCHY_OPTION).as_str()) {
let b = get_bool_value(param, false);
self.unified_cgroup_hierarchy = b;
}
}
if let Ok(addr) = env::var(SERVER_ADDR_ENV_VAR) {
@ -175,6 +185,34 @@ fn get_hotplug_timeout(param: &str) -> Result<time::Duration> {
Ok(time::Duration::from_secs(value.unwrap()))
}
fn get_bool_value(param: &str, default: bool) -> bool {
let fields: Vec<&str> = param.split("=").collect();
if fields.len() != 2 {
return default;
}
let v = fields[1];
// bool
let t: std::result::Result<bool, std::str::ParseBoolError> = v.parse();
if t.is_ok() {
return t.unwrap();
}
// integer
let i: std::result::Result<u64, std::num::ParseIntError> = v.parse();
if i.is_err() {
return default;
}
// only `0` returns false, otherwise returns true
match i.unwrap() {
0 => false,
_ => true,
}
}
fn get_container_pipe_size(param: &str) -> Result<i32> {
let fields: Vec<&str> = param.split("=").collect();
@ -269,6 +307,7 @@ mod tests {
log_level: slog::Level,
hotplug_timeout: time::Duration,
container_pipe_size: i32,
unified_cgroup_hierarchy: bool,
}
let tests = &[
@ -279,6 +318,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.debug_console agent.devmodex",
@ -287,6 +327,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.logx=debug",
@ -295,6 +336,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.log=debug",
@ -303,6 +345,7 @@ mod tests {
log_level: slog::Level::Debug,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "",
@ -311,6 +354,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo",
@ -319,6 +363,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo bar",
@ -327,6 +372,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo bar",
@ -335,6 +381,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent bar",
@ -343,6 +390,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo debug_console agent bar devmode",
@ -351,6 +399,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.debug_console",
@ -359,6 +408,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: " agent.debug_console ",
@ -367,6 +417,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.debug_console foo",
@ -375,6 +426,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: " agent.debug_console foo",
@ -383,6 +435,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent.debug_console bar",
@ -391,6 +444,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent.debug_console",
@ -399,6 +453,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent.debug_console ",
@ -407,6 +462,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode",
@ -415,6 +471,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: " agent.devmode ",
@ -423,6 +480,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode foo",
@ -431,6 +489,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: " agent.devmode foo",
@ -439,6 +498,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent.devmode bar",
@ -447,6 +507,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent.devmode",
@ -455,6 +516,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "foo agent.devmode ",
@ -463,6 +525,7 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode agent.debug_console",
@ -471,54 +534,61 @@ mod tests {
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode agent.debug_console agent.hotplug_timeout=100",
contents: "agent.devmode agent.debug_console agent.hotplug_timeout=100 agent.unified_cgroup_hierarchy=a",
debug_console: true,
dev_mode: true,
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: time::Duration::from_secs(100),
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode agent.debug_console agent.hotplug_timeout=0",
contents: "agent.devmode agent.debug_console agent.hotplug_timeout=0 agent.unified_cgroup_hierarchy=11",
debug_console: true,
dev_mode: true,
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: true,
},
TestData {
contents: "agent.devmode agent.debug_console agent.container_pipe_size=2097152",
contents: "agent.devmode agent.debug_console agent.container_pipe_size=2097152 agent.unified_cgroup_hierarchy=false",
debug_console: true,
dev_mode: true,
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: 2097152,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode agent.debug_console agent.container_pipe_size=100",
contents: "agent.devmode agent.debug_console agent.container_pipe_size=100 agent.unified_cgroup_hierarchy=true",
debug_console: true,
dev_mode: true,
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: 100,
unified_cgroup_hierarchy: true,
},
TestData {
contents: "agent.devmode agent.debug_console agent.container_pipe_size=0",
contents: "agent.devmode agent.debug_console agent.container_pipe_size=0 agent.unified_cgroup_hierarchy=0",
debug_console: true,
dev_mode: true,
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: false,
},
TestData {
contents: "agent.devmode agent.debug_console agent.container_pip_siz=100",
contents: "agent.devmode agent.debug_console agent.container_pip_siz=100 agent.unified_cgroup_hierarchy=1",
debug_console: true,
dev_mode: true,
log_level: DEFAULT_LOG_LEVEL,
hotplug_timeout: DEFAULT_HOTPLUG_TIMEOUT,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
unified_cgroup_hierarchy: true,
},
];
@ -550,6 +620,7 @@ mod tests {
let mut config = agentConfig::new();
assert_eq!(config.debug_console, false, "{}", msg);
assert_eq!(config.dev_mode, false, "{}", msg);
assert_eq!(config.unified_cgroup_hierarchy, false, "{}", msg);
assert_eq!(
config.hotplug_timeout,
time::Duration::from_secs(3),
@ -563,6 +634,11 @@ mod tests {
assert_eq!(d.debug_console, config.debug_console, "{}", msg);
assert_eq!(d.dev_mode, config.dev_mode, "{}", msg);
assert_eq!(
d.unified_cgroup_hierarchy, config.unified_cgroup_hierarchy,
"{}",
msg
);
assert_eq!(d.log_level, config.log_level, "{}", msg);
assert_eq!(d.hotplug_timeout, config.hotplug_timeout, "{}", msg);
assert_eq!(d.container_pipe_size, config.container_pipe_size, "{}", msg);

View File

@ -131,6 +131,15 @@ fn main() -> Result<()> {
let writer = unsafe { File::from_raw_fd(wfd) };
let agentConfig = AGENT_CONFIG.clone();
// once parsed cmdline and set the config, release the write lock
// as soon as possible in case other thread would get read lock on
// it.
{
let mut config = agentConfig.write().unwrap();
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
}
let config = agentConfig.read().unwrap();
let init_mode = unistd::getpid() == Pid::from_raw(1);
if init_mode {
@ -144,18 +153,9 @@ fn main() -> Result<()> {
// since before do the base mount, it wouldn't access "/proc/cmdline"
// to get the customzied debug level.
let logger = logging::create_logger(NAME, "agent", slog::Level::Debug, writer);
init_agent_as_init(&logger)?;
init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?;
}
// once parsed cmdline and set the config, release the write lock
// as soon as possible in case other thread would get read lock on
// it.
{
let mut config = agentConfig.write().unwrap();
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
}
let config = agentConfig.read().unwrap();
let log_vport = config.log_vport as u32;
let log_handle = thread::spawn(move || -> Result<()> {
let mut reader = unsafe { File::from_raw_fd(rfd) };
@ -339,9 +339,9 @@ fn setup_signal_handler(logger: &Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
// init_agent_as_init will do the initializations such as setting up the rootfs
// when this agent has been run as the init process.
fn init_agent_as_init(logger: &Logger) -> Result<()> {
fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {
general_mount(logger)?;
cgroups_mount(logger)?;
cgroups_mount(logger, unified_cgroup_hierarchy)?;
fs::remove_file(Path::new("/dev/ptmx"))?;
unixfs::symlink(Path::new("/dev/pts/ptmx"), Path::new("/dev/ptmx"))?;

View File

@ -542,7 +542,22 @@ pub fn get_mount_fs_type_from_file(mount_file: &str, mount_point: &str) -> Resul
))
}
pub fn get_cgroup_mounts(logger: &Logger, cg_path: &str) -> Result<Vec<INIT_MOUNT>> {
pub fn get_cgroup_mounts(
logger: &Logger,
cg_path: &str,
unified_cgroup_hierarchy: bool,
) -> Result<Vec<INIT_MOUNT>> {
// cgroup v2
// https://github.com/kata-containers/agent/blob/8c9bbadcd448c9a67690fbe11a860aaacc69813c/agent.go#L1249
if unified_cgroup_hierarchy {
return Ok(vec![INIT_MOUNT {
fstype: "cgroup2",
src: "cgroup2",
dest: "/sys/fs/cgroup",
options: vec!["nosuid", "nodev", "noexec", "relatime", "nsdelegate"],
}]);
}
let file = File::open(&cg_path)?;
let reader = BufReader::new(file);
@ -617,10 +632,10 @@ pub fn get_cgroup_mounts(logger: &Logger, cg_path: &str) -> Result<Vec<INIT_MOUN
Ok(cg_mounts)
}
pub fn cgroups_mount(logger: &Logger) -> Result<()> {
pub fn cgroups_mount(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result<()> {
let logger = logger.new(o!("subsystem" => "mount"));
let cgroups = get_cgroup_mounts(&logger, PROC_CGROUPS)?;
let cgroups = get_cgroup_mounts(&logger, PROC_CGROUPS, unified_cgroup_hierarchy)?;
for cg in cgroups.iter() {
mount_to_rootfs(&logger, cg)?;
@ -1071,6 +1086,20 @@ mod tests {
}
}
#[test]
fn test_get_cgroup_v2_mounts() {
let dir = tempdir().expect("failed to create tmpdir");
let drain = slog::Discard;
let logger = slog::Logger::root(drain, o!());
let result = get_cgroup_mounts(&logger, "", true);
assert_eq!(true, result.is_ok());
let result = result.unwrap();
assert_eq!(1, result.len());
assert_eq!(result[0].fstype, "cgroup2");
assert_eq!(result[0].src, "cgroup2");
}
#[test]
fn test_get_cgroup_mounts() {
#[derive(Debug)]
@ -1175,7 +1204,7 @@ mod tests {
];
// First, test a missing file
let result = get_cgroup_mounts(&logger, enoent_filename);
let result = get_cgroup_mounts(&logger, enoent_filename, false);
assert!(result.is_err());
let error_msg = format!("{}", result.unwrap_err());
@ -1198,7 +1227,7 @@ mod tests {
file.write_all(d.contents.as_bytes())
.expect(&format!("{}: failed to write file contents", msg));
let result = get_cgroup_mounts(&logger, filename);
let result = get_cgroup_mounts(&logger, filename, false);
let msg = format!("{}: result: {:?}", msg, result);
if d.error_contains != "" {

View File

@ -12,7 +12,7 @@ use oci::{LinuxNamespace, Root, Spec};
use protobuf::{RepeatedField, SingularPtrField};
use protocols::agent::{
AgentDetails, CopyFileRequest, GuestDetailsResponse, Interfaces, ListProcessesResponse,
Metrics, ReadStreamResponse, Routes, StatsContainerResponse, WaitProcessResponse,
Metrics, OOMEvent, ReadStreamResponse, Routes, StatsContainerResponse, WaitProcessResponse,
WriteStreamResponse,
};
use protocols::empty::Empty;
@ -21,6 +21,7 @@ use protocols::health::{
};
use protocols::types::Interface;
use rustjail;
use rustjail::cgroups::notifier;
use rustjail::container::{BaseContainer, Container, LinuxContainer};
use rustjail::process::Process;
use rustjail::specconv::CreateOpts;
@ -94,7 +95,7 @@ impl agentService {
}
};
info!(sl!(), "receive createcontainer {}", &cid);
info!(sl!(), "receive createcontainer, spec: {:?}", &oci);
// re-scan PCI bus
// looking for hidden devices
@ -178,6 +179,7 @@ impl agentService {
let sandbox = self.sandbox.clone();
let mut s = sandbox.lock().unwrap();
let sid = s.id.clone();
let ctr: &mut LinuxContainer = match s.get_container(cid.as_str()) {
Some(cr) => cr,
@ -188,6 +190,15 @@ impl agentService {
ctr.exec()?;
// start oom event loop
if sid != cid && ctr.cgroup_manager.is_some() {
let cg_path = ctr.cgroup_manager.as_ref().unwrap().get_cg_path("memory");
if cg_path.is_some() {
let rx = notifier::notify_oom(cid.as_str(), cg_path.unwrap())?;
s.run_oom_event_monitor(rx, cid);
}
}
Ok(())
}
@ -329,7 +340,7 @@ impl agentService {
sl!(),
"signal process";
"container-id" => cid.clone(),
"exec-id" => eid.clone()
"exec-id" => eid.clone(),
);
if eid == "" {
@ -488,7 +499,6 @@ impl agentService {
let eid = req.exec_id;
let mut fd: RawFd = -1;
info!(sl!(), "read stdout for {}/{}", cid.clone(), eid.clone());
{
let s = self.sandbox.clone();
let mut sandbox = s.lock().unwrap();
@ -1293,6 +1303,34 @@ impl protocols::agent_ttrpc::AgentService for agentService {
}
}
}
fn get_oom_event(
&self,
_ctx: &ttrpc::TtrpcContext,
_req: protocols::agent::GetOOMEventRequest,
) -> ttrpc::Result<OOMEvent> {
let sandbox = self.sandbox.clone();
let s = sandbox.lock().unwrap();
let event_rx = &s.event_rx.clone();
let event_rx = event_rx.lock().unwrap();
drop(s);
drop(sandbox);
match event_rx.recv() {
Err(err) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
err.to_string(),
)))
}
Ok(container_id) => {
info!(sl!(), "get_oom_event return {}", &container_id);
let mut resp = OOMEvent::new();
resp.container_id = container_id;
return Ok(resp);
}
}
}
}
#[derive(Clone)]

View File

@ -10,12 +10,13 @@ use crate::namespace::Namespace;
use crate::namespace::NSTYPEPID;
use crate::network::Network;
use anyhow::{anyhow, Context, Result};
use cgroups;
use libc::pid_t;
use netlink::{RtnlHandle, NETLINK_ROUTE};
use oci::{Hook, Hooks};
use protocols::agent::OnlineCPUMemRequest;
use regex::Regex;
use rustjail::cgroups;
use rustjail::cgroups as rustjail_cgroups;
use rustjail::container::BaseContainer;
use rustjail::container::LinuxContainer;
use rustjail::process::Process;
@ -24,7 +25,8 @@ use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::{thread, time};
#[derive(Debug)]
@ -46,12 +48,16 @@ pub struct Sandbox {
pub sender: Option<Sender<i32>>,
pub rtnl: Option<RtnlHandle>,
pub hooks: Option<Hooks>,
pub event_rx: Arc<Mutex<Receiver<String>>>,
pub event_tx: Sender<String>,
}
impl Sandbox {
pub fn new(logger: &Logger) -> Result<Self> {
let fs_type = get_mount_fs_type("/")?;
let logger = logger.new(o!("subsystem" => "sandbox"));
let (tx, rx) = mpsc::channel::<String>();
let event_rx = Arc::new(Mutex::new(rx));
Ok(Sandbox {
logger: logger.clone(),
@ -71,6 +77,8 @@ impl Sandbox {
sender: None,
rtnl: Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()),
hooks: None,
event_rx: event_rx,
event_tx: tx,
})
}
@ -239,7 +247,7 @@ impl Sandbox {
online_memory(&self.logger)?;
}
let cpuset = cgroups::fs::get_guest_cpuset()?;
let cpuset = rustjail_cgroups::fs::get_guest_cpuset()?;
for (_, ctr) in self.containers.iter() {
info!(self.logger, "updating {}", ctr.id.as_str());
@ -302,6 +310,21 @@ impl Sandbox {
Ok(hooks)
}
pub fn run_oom_event_monitor(&self, rx: Receiver<String>, container_id: String) {
let tx = self.event_tx.clone();
let logger = self.logger.clone();
thread::spawn(move || {
for event in rx {
info!(logger, "got an OOM event {:?}", event);
match tx.send(container_id.clone()) {
Err(err) => error!(logger, "failed to send message: {:?}", err),
Ok(_) => {}
}
}
});
}
}
fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Result<i32> {

View File

@ -146,7 +146,7 @@ func watchOOMEvents(ctx context.Context, s *service) {
// If the GetOOMEvent call is not implemented, then the agent is most likely an older version,
// stop attempting to get OOM events.
// for rust agent, the response code is not found
if isGRPCErrorCode(codes.NotFound, err) {
if isGRPCErrorCode(codes.NotFound, err) || err.Error() == "Dead agent" {
return
}
continue

View File

@ -1203,10 +1203,10 @@ func (s *Sandbox) CreateContainer(contConfig ContainerConfig) (VCContainer, erro
// Rollback if error happens.
if err != nil {
logger := s.Logger().WithFields(logrus.Fields{"container-id": c.id, "sandox-id": s.id, "rollback": true})
logger.Warning("Cleaning up partially created container")
logger.WithError(err).Error("Cleaning up partially created container")
if err2 := c.stop(true); err2 != nil {
logger.WithError(err2).Warning("Could not delete container")
logger.WithError(err2).Error("Could not delete container")
}
logger.Debug("Removing stopped container from sandbox store")
@ -1894,9 +1894,11 @@ func (s *Sandbox) updateResources() error {
return err
}
s.Logger().Debugf("Request to hypervisor to update oldCPUs/newCPUs: %d/%d", oldCPUs, newCPUs)
// If the CPUs were increased, ask agent to online them
if oldCPUs < newCPUs {
vcpusAdded := newCPUs - oldCPUs
s.Logger().Debugf("Request to onlineCPUMem with %d CPUs", vcpusAdded)
if err := s.agent.onlineCPUMem(vcpusAdded, true); err != nil {
return err
}