mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-28 19:54:35 +00:00
agent: convert the sl
macros to functions
There is nothing in them that requires them to be macros. Converting them to functions allows for better error messages. Fixes: #7201 Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
parent
0860fbd410
commit
0504bd7254
@ -39,11 +39,9 @@ use std::path::Path;
|
||||
|
||||
const GUEST_CPUS_PATH: &str = "/sys/devices/system/cpu/online";
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger().new(o!("subsystem" => "cgroups"))
|
||||
};
|
||||
// Convenience function to obtain the scope logger.
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger().new(o!("subsystem" => "cgroups"))
|
||||
}
|
||||
|
||||
macro_rules! get_controller_or_return_singular_none {
|
||||
@ -82,7 +80,7 @@ impl CgroupManager for Manager {
|
||||
|
||||
fn set(&self, r: &LinuxResources, update: bool) -> Result<()> {
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"cgroup manager set resources for container. Resources input {:?}", r
|
||||
);
|
||||
|
||||
@ -120,7 +118,7 @@ impl CgroupManager for Manager {
|
||||
|
||||
// set devices resources
|
||||
set_devices_resources(&self.cgroup, &r.devices, res);
|
||||
info!(sl!(), "resources after processed {:?}", res);
|
||||
info!(sl(), "resources after processed {:?}", res);
|
||||
|
||||
// apply resources
|
||||
self.cgroup.apply(res)?;
|
||||
@ -197,7 +195,7 @@ impl CgroupManager for Manager {
|
||||
if guest_cpuset.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
info!(sl!(), "update_cpuset_path to: {}", guest_cpuset);
|
||||
info!(sl(), "update_cpuset_path to: {}", guest_cpuset);
|
||||
|
||||
let h = cgroups::hierarchies::auto();
|
||||
let root_cg = h.root_control_group();
|
||||
@ -205,12 +203,12 @@ impl CgroupManager for Manager {
|
||||
let root_cpuset_controller: &CpuSetController = root_cg.controller_of().unwrap();
|
||||
let path = root_cpuset_controller.path();
|
||||
let root_path = Path::new(path);
|
||||
info!(sl!(), "root cpuset path: {:?}", &path);
|
||||
info!(sl(), "root cpuset path: {:?}", &path);
|
||||
|
||||
let container_cpuset_controller: &CpuSetController = self.cgroup.controller_of().unwrap();
|
||||
let path = container_cpuset_controller.path();
|
||||
let container_path = Path::new(path);
|
||||
info!(sl!(), "container cpuset path: {:?}", &path);
|
||||
info!(sl(), "container cpuset path: {:?}", &path);
|
||||
|
||||
let mut paths = vec![];
|
||||
for ancestor in container_path.ancestors() {
|
||||
@ -219,7 +217,7 @@ impl CgroupManager for Manager {
|
||||
}
|
||||
paths.push(ancestor);
|
||||
}
|
||||
info!(sl!(), "parent paths to update cpuset: {:?}", &paths);
|
||||
info!(sl(), "parent paths to update cpuset: {:?}", &paths);
|
||||
|
||||
let mut i = paths.len();
|
||||
loop {
|
||||
@ -233,7 +231,7 @@ impl CgroupManager for Manager {
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.trim_start_matches(root_path.to_str().unwrap());
|
||||
info!(sl!(), "updating cpuset for parent path {:?}", &r_path);
|
||||
info!(sl(), "updating cpuset for parent path {:?}", &r_path);
|
||||
let cg = new_cgroup(cgroups::hierarchies::auto(), r_path)?;
|
||||
let cpuset_controller: &CpuSetController = cg.controller_of().unwrap();
|
||||
cpuset_controller.set_cpus(guest_cpuset)?;
|
||||
@ -241,7 +239,7 @@ impl CgroupManager for Manager {
|
||||
|
||||
if !container_cpuset.is_empty() {
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"updating cpuset for container path: {:?} cpuset: {}",
|
||||
&container_path,
|
||||
container_cpuset
|
||||
@ -276,7 +274,7 @@ fn set_network_resources(
|
||||
network: &LinuxNetwork,
|
||||
res: &mut cgroups::Resources,
|
||||
) {
|
||||
info!(sl!(), "cgroup manager set network");
|
||||
info!(sl(), "cgroup manager set network");
|
||||
|
||||
// set classid
|
||||
// description can be found at https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v1/net_cls.html
|
||||
@ -303,7 +301,7 @@ fn set_devices_resources(
|
||||
device_resources: &[LinuxDeviceCgroup],
|
||||
res: &mut cgroups::Resources,
|
||||
) {
|
||||
info!(sl!(), "cgroup manager set devices");
|
||||
info!(sl(), "cgroup manager set devices");
|
||||
let mut devices = vec![];
|
||||
|
||||
for d in device_resources.iter() {
|
||||
@ -332,7 +330,7 @@ fn set_hugepages_resources(
|
||||
hugepage_limits: &[LinuxHugepageLimit],
|
||||
res: &mut cgroups::Resources,
|
||||
) {
|
||||
info!(sl!(), "cgroup manager set hugepage");
|
||||
info!(sl(), "cgroup manager set hugepage");
|
||||
let mut limits = vec![];
|
||||
let hugetlb_controller = cg.controller_of::<HugeTlbController>();
|
||||
|
||||
@ -346,7 +344,7 @@ fn set_hugepages_resources(
|
||||
limits.push(hr);
|
||||
} else {
|
||||
warn!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"{} page size support cannot be verified, dropping requested limit", l.page_size
|
||||
);
|
||||
}
|
||||
@ -359,7 +357,7 @@ fn set_block_io_resources(
|
||||
blkio: &LinuxBlockIo,
|
||||
res: &mut cgroups::Resources,
|
||||
) {
|
||||
info!(sl!(), "cgroup manager set block io");
|
||||
info!(sl(), "cgroup manager set block io");
|
||||
|
||||
res.blkio.weight = blkio.weight;
|
||||
res.blkio.leaf_weight = blkio.leaf_weight;
|
||||
@ -387,13 +385,13 @@ fn set_block_io_resources(
|
||||
}
|
||||
|
||||
fn set_cpu_resources(cg: &cgroups::Cgroup, cpu: &LinuxCpu) -> Result<()> {
|
||||
info!(sl!(), "cgroup manager set cpu");
|
||||
info!(sl(), "cgroup manager set cpu");
|
||||
|
||||
let cpuset_controller: &CpuSetController = cg.controller_of().unwrap();
|
||||
|
||||
if !cpu.cpus.is_empty() {
|
||||
if let Err(e) = cpuset_controller.set_cpus(&cpu.cpus) {
|
||||
warn!(sl!(), "write cpuset failed: {:?}", e);
|
||||
warn!(sl(), "write cpuset failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -424,7 +422,7 @@ fn set_cpu_resources(cg: &cgroups::Cgroup, cpu: &LinuxCpu) -> Result<()> {
|
||||
}
|
||||
|
||||
fn set_memory_resources(cg: &cgroups::Cgroup, memory: &LinuxMemory, update: bool) -> Result<()> {
|
||||
info!(sl!(), "cgroup manager set memory");
|
||||
info!(sl(), "cgroup manager set memory");
|
||||
let mem_controller: &MemController = cg.controller_of().unwrap();
|
||||
|
||||
if !update {
|
||||
@ -493,7 +491,7 @@ fn set_memory_resources(cg: &cgroups::Cgroup, memory: &LinuxMemory, update: bool
|
||||
}
|
||||
|
||||
fn set_pids_resources(cg: &cgroups::Cgroup, pids: &LinuxPids) -> Result<()> {
|
||||
info!(sl!(), "cgroup manager set pids");
|
||||
info!(sl(), "cgroup manager set pids");
|
||||
let pid_controller: &PidController = cg.controller_of().unwrap();
|
||||
let v = if pids.limit > 0 {
|
||||
MaxValue::Value(pids.limit)
|
||||
@ -962,7 +960,7 @@ pub fn get_paths() -> Result<HashMap<String, String>> {
|
||||
for l in fs::read_to_string(PATHS)?.lines() {
|
||||
let fl: Vec<&str> = l.split(':').collect();
|
||||
if fl.len() != 3 {
|
||||
info!(sl!(), "Corrupted cgroup data!");
|
||||
info!(sl(), "Corrupted cgroup data!");
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -983,7 +981,7 @@ pub fn get_mounts(paths: &HashMap<String, String>) -> Result<HashMap<String, Str
|
||||
let post: Vec<&str> = p[1].split(' ').collect();
|
||||
|
||||
if post.len() != 3 {
|
||||
warn!(sl!(), "can't parse {} line {:?}", MOUNTS, l);
|
||||
warn!(sl(), "can't parse {} line {:?}", MOUNTS, l);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -16,11 +16,9 @@ use inotify::{Inotify, WatchMask};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::mpsc::{channel, Receiver};
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger().new(o!("subsystem" => "cgroups_notifier"))
|
||||
};
|
||||
// Convenience function to obtain the scope logger.
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger().new(o!("subsystem" => "cgroups_notifier"))
|
||||
}
|
||||
|
||||
pub async fn notify_oom(cid: &str, cg_dir: String) -> Result<Receiver<String>> {
|
||||
@ -38,7 +36,7 @@ pub async fn notify_oom(cid: &str, cg_dir: String) -> Result<Receiver<String>> {
|
||||
fn get_value_from_cgroup(path: &Path, key: &str) -> Result<i64> {
|
||||
let content = fs::read_to_string(path)?;
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"get_value_from_cgroup file: {:?}, content: {}", &path, &content
|
||||
);
|
||||
|
||||
@ -67,11 +65,11 @@ async fn register_memory_event_v2(
|
||||
let event_control_path = Path::new(&cg_dir).join(memory_event_name);
|
||||
let cgroup_event_control_path = Path::new(&cg_dir).join(cgroup_event_name);
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"register_memory_event_v2 event_control_path: {:?}", &event_control_path
|
||||
);
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"register_memory_event_v2 cgroup_event_control_path: {:?}", &cgroup_event_control_path
|
||||
);
|
||||
|
||||
@ -82,8 +80,8 @@ async fn register_memory_event_v2(
|
||||
// Because no `unix.IN_DELETE|unix.IN_DELETE_SELF` event for cgroup file system, so watching all process exited
|
||||
let cg_wd = inotify.add_watch(&cgroup_event_control_path, WatchMask::MODIFY)?;
|
||||
|
||||
info!(sl!(), "ev_wd: {:?}", ev_wd);
|
||||
info!(sl!(), "cg_wd: {:?}", cg_wd);
|
||||
info!(sl(), "ev_wd: {:?}", ev_wd);
|
||||
info!(sl(), "cg_wd: {:?}", cg_wd);
|
||||
|
||||
let (sender, receiver) = channel(100);
|
||||
let containere_id = containere_id.to_string();
|
||||
@ -97,17 +95,17 @@ async fn register_memory_event_v2(
|
||||
while let Some(event_or_error) = stream.next().await {
|
||||
let event = event_or_error.unwrap();
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"container[{}] get event for container: {:?}", &containere_id, &event
|
||||
);
|
||||
// info!("is1: {}", event.wd == wd1);
|
||||
info!(sl!(), "event.wd: {:?}", event.wd);
|
||||
info!(sl(), "event.wd: {:?}", event.wd);
|
||||
|
||||
if event.wd == ev_wd {
|
||||
let oom = get_value_from_cgroup(&event_control_path, "oom_kill");
|
||||
if oom.unwrap_or(0) > 0 {
|
||||
let _ = sender.send(containere_id.clone()).await.map_err(|e| {
|
||||
error!(sl!(), "send containere_id failed, error: {:?}", e);
|
||||
error!(sl(), "send containere_id failed, error: {:?}", e);
|
||||
});
|
||||
return;
|
||||
}
|
||||
@ -171,13 +169,13 @@ async fn register_memory_event(
|
||||
let mut buf = [0u8; 8];
|
||||
match eventfd_stream.read(&mut buf).await {
|
||||
Err(err) => {
|
||||
warn!(sl!(), "failed to read from eventfd: {:?}", err);
|
||||
warn!(sl(), "failed to read from eventfd: {:?}", err);
|
||||
return;
|
||||
}
|
||||
Ok(_) => {
|
||||
let content = fs::read_to_string(path.clone());
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"cgroup event for container: {}, path: {:?}, content: {:?}",
|
||||
&containere_id,
|
||||
&path,
|
||||
@ -193,7 +191,7 @@ async fn register_memory_event(
|
||||
}
|
||||
|
||||
let _ = sender.send(containere_id.clone()).await.map_err(|e| {
|
||||
error!(sl!(), "send containere_id failed, error: {:?}", e);
|
||||
error!(sl(), "send containere_id failed, error: {:?}", e);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
@ -1596,10 +1596,8 @@ mod tests {
|
||||
use tempfile::tempdir;
|
||||
use test_utils::skip_if_not_root;
|
||||
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger()
|
||||
};
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger()
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -1854,7 +1852,7 @@ mod tests {
|
||||
let _ = new_linux_container_and_then(|mut c: LinuxContainer| {
|
||||
c.processes.insert(
|
||||
1,
|
||||
Process::new(&sl!(), &oci::Process::default(), "123", true, 1).unwrap(),
|
||||
Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap(),
|
||||
);
|
||||
let p = c.get_process("123");
|
||||
assert!(p.is_ok(), "Expecting Ok, Got {:?}", p);
|
||||
@ -1881,7 +1879,7 @@ mod tests {
|
||||
let (c, _dir) = new_linux_container();
|
||||
let ret = c
|
||||
.unwrap()
|
||||
.start(Process::new(&sl!(), &oci::Process::default(), "123", true, 1).unwrap())
|
||||
.start(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap())
|
||||
.await;
|
||||
assert!(ret.is_err(), "Expecting Err, Got {:?}", ret);
|
||||
}
|
||||
@ -1891,7 +1889,7 @@ mod tests {
|
||||
let (c, _dir) = new_linux_container();
|
||||
let ret = c
|
||||
.unwrap()
|
||||
.run(Process::new(&sl!(), &oci::Process::default(), "123", true, 1).unwrap())
|
||||
.run(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap())
|
||||
.await;
|
||||
assert!(ret.is_err(), "Expecting Err, Got {:?}", ret);
|
||||
}
|
||||
|
@ -26,11 +26,9 @@ use oci::{LinuxDeviceCgroup, LinuxResources, Spec};
|
||||
use protocols::agent::Device;
|
||||
use tracing::instrument;
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger().new(o!("subsystem" => "device"))
|
||||
};
|
||||
// Convenience function to obtain the scope logger.
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger().new(o!("subsystem" => "device"))
|
||||
}
|
||||
|
||||
const VM_ROOTFS: &str = "/";
|
||||
@ -78,7 +76,7 @@ where
|
||||
{
|
||||
let syspci = Path::new(&syspci);
|
||||
let drv = drv.as_ref();
|
||||
info!(sl!(), "rebind_pci_driver: {} => {:?}", dev, drv);
|
||||
info!(sl(), "rebind_pci_driver: {} => {:?}", dev, drv);
|
||||
|
||||
let devpath = syspci.join("devices").join(dev.to_string());
|
||||
let overridepath = &devpath.join("driver_override");
|
||||
@ -606,7 +604,7 @@ fn update_spec_devices(spec: &mut Spec, mut updates: HashMap<&str, DevUpdate>) -
|
||||
let host_minor = specdev.minor;
|
||||
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"update_spec_devices() updating device";
|
||||
"container_path" => &specdev.path,
|
||||
"type" => &specdev.r#type,
|
||||
@ -657,7 +655,7 @@ fn update_spec_devices(spec: &mut Spec, mut updates: HashMap<&str, DevUpdate>) -
|
||||
if let Some(update) = res_updates.get(&(r.r#type.as_str(), host_major, host_minor))
|
||||
{
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"update_spec_devices() updating resource";
|
||||
"type" => &r.r#type,
|
||||
"host_major" => host_major,
|
||||
@ -921,7 +919,7 @@ pub async fn add_devices(
|
||||
#[instrument]
|
||||
async fn add_device(device: &Device, sandbox: &Arc<Mutex<Sandbox>>) -> Result<SpecUpdate> {
|
||||
// log before validation to help with debugging gRPC protocol version differences.
|
||||
info!(sl!(), "device-id: {}, device-type: {}, device-vm-path: {}, device-container-path: {}, device-options: {:?}",
|
||||
info!(sl(), "device-id: {}, device-type: {}, device-vm-path: {}, device-container-path: {}, device-options: {:?}",
|
||||
device.id, device.type_, device.vm_path, device.container_path, device.options);
|
||||
|
||||
if device.type_.is_empty() {
|
||||
|
@ -15,11 +15,9 @@ use tracing::instrument;
|
||||
const NAMESPACE_KATA_AGENT: &str = "kata_agent";
|
||||
const NAMESPACE_KATA_GUEST: &str = "kata_guest";
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger().new(o!("subsystem" => "metrics"))
|
||||
};
|
||||
// Convenience function to obtain the scope logger.
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger().new(o!("subsystem" => "metrics"))
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@ -139,7 +137,7 @@ fn update_agent_metrics() -> Result<()> {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
// FIXME: return Ok for all errors?
|
||||
warn!(sl!(), "failed to create process instance: {:?}", e);
|
||||
warn!(sl(), "failed to create process instance: {:?}", e);
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
@ -160,7 +158,7 @@ fn update_agent_metrics() -> Result<()> {
|
||||
// io
|
||||
match me.io() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get process io stat: {:?}", err);
|
||||
info!(sl(), "failed to get process io stat: {:?}", err);
|
||||
}
|
||||
Ok(io) => {
|
||||
set_gauge_vec_proc_io(&AGENT_IO_STAT, &io);
|
||||
@ -169,7 +167,7 @@ fn update_agent_metrics() -> Result<()> {
|
||||
|
||||
match me.stat() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get process stat: {:?}", err);
|
||||
info!(sl(), "failed to get process stat: {:?}", err);
|
||||
}
|
||||
Ok(stat) => {
|
||||
set_gauge_vec_proc_stat(&AGENT_PROC_STAT, &stat);
|
||||
@ -177,7 +175,7 @@ fn update_agent_metrics() -> Result<()> {
|
||||
}
|
||||
|
||||
match me.status() {
|
||||
Err(err) => error!(sl!(), "failed to get process status: {:?}", err),
|
||||
Err(err) => error!(sl(), "failed to get process status: {:?}", err),
|
||||
Ok(status) => set_gauge_vec_proc_status(&AGENT_PROC_STATUS, &status),
|
||||
}
|
||||
|
||||
@ -189,7 +187,7 @@ fn update_guest_metrics() {
|
||||
// try get load and task info
|
||||
match procfs::LoadAverage::new() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get guest LoadAverage: {:?}", err);
|
||||
info!(sl(), "failed to get guest LoadAverage: {:?}", err);
|
||||
}
|
||||
Ok(load) => {
|
||||
GUEST_LOAD
|
||||
@ -209,7 +207,7 @@ fn update_guest_metrics() {
|
||||
// try to get disk stats
|
||||
match procfs::diskstats() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get guest diskstats: {:?}", err);
|
||||
info!(sl(), "failed to get guest diskstats: {:?}", err);
|
||||
}
|
||||
Ok(diskstats) => {
|
||||
for diskstat in diskstats {
|
||||
@ -221,7 +219,7 @@ fn update_guest_metrics() {
|
||||
// try to get vm stats
|
||||
match procfs::vmstat() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get guest vmstat: {:?}", err);
|
||||
info!(sl(), "failed to get guest vmstat: {:?}", err);
|
||||
}
|
||||
Ok(vmstat) => {
|
||||
for (k, v) in vmstat {
|
||||
@ -233,7 +231,7 @@ fn update_guest_metrics() {
|
||||
// cpu stat
|
||||
match procfs::KernelStats::new() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get guest KernelStats: {:?}", err);
|
||||
info!(sl(), "failed to get guest KernelStats: {:?}", err);
|
||||
}
|
||||
Ok(kernel_stats) => {
|
||||
set_gauge_vec_cpu_time(&GUEST_CPU_TIME, "total", &kernel_stats.total);
|
||||
@ -246,7 +244,7 @@ fn update_guest_metrics() {
|
||||
// try to get net device stats
|
||||
match procfs::net::dev_status() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get guest net::dev_status: {:?}", err);
|
||||
info!(sl(), "failed to get guest net::dev_status: {:?}", err);
|
||||
}
|
||||
Ok(devs) => {
|
||||
// netdev: map[string]procfs::net::DeviceStatus
|
||||
@ -259,7 +257,7 @@ fn update_guest_metrics() {
|
||||
// get statistics about memory from /proc/meminfo
|
||||
match procfs::Meminfo::new() {
|
||||
Err(err) => {
|
||||
info!(sl!(), "failed to get guest Meminfo: {:?}", err);
|
||||
info!(sl(), "failed to get guest Meminfo: {:?}", err);
|
||||
}
|
||||
Ok(meminfo) => {
|
||||
set_gauge_vec_meminfo(&GUEST_MEMINFO, &meminfo);
|
||||
|
@ -110,11 +110,9 @@ const ERR_NO_SANDBOX_PIDNS: &str = "Sandbox does not have sandbox_pidns";
|
||||
// not available.
|
||||
const IPTABLES_RESTORE_WAIT_SEC: u64 = 5;
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger()
|
||||
};
|
||||
// Convenience function to obtain the scope logger.
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger()
|
||||
}
|
||||
|
||||
// Convenience function to wrap an error and response to ttrpc client
|
||||
@ -158,14 +156,14 @@ impl AgentService {
|
||||
let mut oci = match oci_spec.as_mut() {
|
||||
Some(spec) => rustjail::grpc_to_oci(spec),
|
||||
None => {
|
||||
error!(sl!(), "no oci spec in the create container request!");
|
||||
error!(sl(), "no oci spec in the create container request!");
|
||||
return Err(anyhow!(nix::Error::EINVAL));
|
||||
}
|
||||
};
|
||||
|
||||
info!(sl!(), "receive createcontainer, spec: {:?}", &oci);
|
||||
info!(sl(), "receive createcontainer, spec: {:?}", &oci);
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"receive createcontainer, storages: {:?}", &req.storages
|
||||
);
|
||||
|
||||
@ -184,7 +182,7 @@ impl AgentService {
|
||||
// here, the agent will rely on rustjail (using the oci.Mounts
|
||||
// list) to bind mount all of them inside the container.
|
||||
let m = add_storages(
|
||||
sl!(),
|
||||
sl(),
|
||||
req.storages.to_vec(),
|
||||
self.sandbox.clone(),
|
||||
Some(req.container_id.clone()),
|
||||
@ -232,33 +230,33 @@ impl AgentService {
|
||||
};
|
||||
|
||||
let mut ctr: LinuxContainer =
|
||||
LinuxContainer::new(cid.as_str(), CONTAINER_BASE, opts, &sl!())?;
|
||||
LinuxContainer::new(cid.as_str(), CONTAINER_BASE, opts, &sl())?;
|
||||
|
||||
let pipe_size = AGENT_CONFIG.container_pipe_size;
|
||||
|
||||
let p = if let Some(p) = oci.process {
|
||||
Process::new(&sl!(), &p, cid.as_str(), true, pipe_size)?
|
||||
Process::new(&sl(), &p, cid.as_str(), true, pipe_size)?
|
||||
} else {
|
||||
info!(sl!(), "no process configurations!");
|
||||
info!(sl(), "no process configurations!");
|
||||
return Err(anyhow!(nix::Error::EINVAL));
|
||||
};
|
||||
|
||||
// if starting container failed, we will do some rollback work
|
||||
// to ensure no resources are leaked.
|
||||
if let Err(err) = ctr.start(p).await {
|
||||
error!(sl!(), "failed to start container: {:?}", err);
|
||||
error!(sl(), "failed to start container: {:?}", err);
|
||||
if let Err(e) = ctr.destroy().await {
|
||||
error!(sl!(), "failed to destroy container: {:?}", e);
|
||||
error!(sl(), "failed to destroy container: {:?}", e);
|
||||
}
|
||||
if let Err(e) = remove_container_resources(&mut s, &cid) {
|
||||
error!(sl!(), "failed to remove container resources: {:?}", e);
|
||||
error!(sl(), "failed to remove container resources: {:?}", e);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
s.update_shared_pidns(&ctr)?;
|
||||
s.add_container(ctr);
|
||||
info!(sl!(), "created container!");
|
||||
info!(sl(), "created container!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -355,7 +353,7 @@ impl AgentService {
|
||||
let cid = req.container_id.clone();
|
||||
let exec_id = req.exec_id.clone();
|
||||
|
||||
info!(sl!(), "do_exec_process cid: {} eid: {}", cid, exec_id);
|
||||
info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id);
|
||||
|
||||
let s = self.sandbox.clone();
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -370,7 +368,7 @@ impl AgentService {
|
||||
|
||||
let pipe_size = AGENT_CONFIG.container_pipe_size;
|
||||
let ocip = rustjail::process_grpc_to_oci(&process);
|
||||
let p = Process::new(&sl!(), &ocip, exec_id.as_str(), false, pipe_size)?;
|
||||
let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size)?;
|
||||
|
||||
let ctr = sandbox
|
||||
.get_container(&cid)
|
||||
@ -388,7 +386,7 @@ impl AgentService {
|
||||
let s = self.sandbox.clone();
|
||||
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"signal process";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone(),
|
||||
@ -410,7 +408,7 @@ impl AgentService {
|
||||
match p.signal(sig) {
|
||||
Err(Errno::ESRCH) => {
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"signal encounter ESRCH, continue";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone(),
|
||||
@ -426,7 +424,7 @@ impl AgentService {
|
||||
if eid.is_empty() {
|
||||
// eid is empty, signal all the remaining processes in the container cgroup
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"signal all the remaining processes";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone(),
|
||||
@ -434,7 +432,7 @@ impl AgentService {
|
||||
|
||||
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await {
|
||||
warn!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"freeze cgroup failed";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone(),
|
||||
@ -447,7 +445,7 @@ impl AgentService {
|
||||
let res = unsafe { libc::kill(*pid, sig) };
|
||||
if let Err(err) = Errno::result(res).map(drop) {
|
||||
warn!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"signal failed";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone(),
|
||||
@ -458,7 +456,7 @@ impl AgentService {
|
||||
}
|
||||
if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Thawed).await {
|
||||
warn!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"unfreeze cgroup failed";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone(),
|
||||
@ -503,7 +501,7 @@ impl AgentService {
|
||||
let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"wait process";
|
||||
"container-id" => cid.clone(),
|
||||
"exec-id" => eid.clone()
|
||||
@ -520,9 +518,9 @@ impl AgentService {
|
||||
};
|
||||
|
||||
if let Some(mut exit_rx) = exit_rx {
|
||||
info!(sl!(), "cid {} eid {} waiting for exit signal", &cid, &eid);
|
||||
info!(sl(), "cid {} eid {} waiting for exit signal", &cid, &eid);
|
||||
while exit_rx.changed().await.is_ok() {}
|
||||
info!(sl!(), "cid {} eid {} received exit signal", &cid, &eid);
|
||||
info!(sl(), "cid {} eid {} received exit signal", &cid, &eid);
|
||||
}
|
||||
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -840,8 +838,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
|
||||
// More details in https://github.com/kata-containers/kata-containers/issues/6455#issuecomment-1477137277
|
||||
match stat::stat(Path::new(&m.mount_point)) {
|
||||
Ok(_) => info!(sl!(), "stat {} success", m.mount_point),
|
||||
Err(e) => info!(sl!(), "stat {} failed: {}", m.mount_point, e),
|
||||
Ok(_) => info!(sl(), "stat {} success", m.mount_point),
|
||||
Err(e) => info!(sl(), "stat {} failed: {}", m.mount_point, e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1024,7 +1022,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "update_mounts", req);
|
||||
is_allowed(&req)?;
|
||||
|
||||
match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await {
|
||||
match update_ephemeral_mounts(sl(), req.storages.to_vec(), self.sandbox.clone()).await {
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
Err(e) => Err(ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
@ -1041,7 +1039,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "get_iptables", req);
|
||||
is_allowed(&req)?;
|
||||
|
||||
info!(sl!(), "get_ip_tables: request received");
|
||||
info!(sl(), "get_ip_tables: request received");
|
||||
|
||||
// the binary could exists in either /usr/sbin or /sbin
|
||||
// here check both of the places and return the one exists
|
||||
@ -1066,7 +1064,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
..Default::default()
|
||||
}),
|
||||
Err(e) => {
|
||||
warn!(sl!(), "failed to run {}: {:?}", cmd, e.kind());
|
||||
warn!(sl(), "failed to run {}: {:?}", cmd, e.kind());
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
}
|
||||
@ -1080,7 +1078,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "set_iptables", req);
|
||||
is_allowed(&req)?;
|
||||
|
||||
info!(sl!(), "set_ip_tables request received");
|
||||
info!(sl(), "set_ip_tables request received");
|
||||
|
||||
// the binary could exists in both /usr/sbin and /sbin
|
||||
// here check both of the places and return the one exists
|
||||
@ -1109,7 +1107,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
{
|
||||
Ok(child) => child,
|
||||
Err(e) => {
|
||||
warn!(sl!(), "failure to spawn {}: {:?}", cmd, e.kind());
|
||||
warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind());
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
};
|
||||
@ -1130,12 +1128,12 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
let _ = match stdin.write_all(&req.data) {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
warn!(sl!(), "error writing stdin: {:?}", e.kind());
|
||||
warn!(sl(), "error writing stdin: {:?}", e.kind());
|
||||
return;
|
||||
}
|
||||
};
|
||||
if tx.send(1).is_err() {
|
||||
warn!(sl!(), "stdin writer thread receiver dropped");
|
||||
warn!(sl(), "stdin writer thread receiver dropped");
|
||||
};
|
||||
});
|
||||
|
||||
@ -1160,7 +1158,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"failure waiting for spawned {} to complete: {:?}",
|
||||
cmd,
|
||||
e.kind()
|
||||
@ -1170,7 +1168,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
};
|
||||
|
||||
if !output.status.success() {
|
||||
warn!(sl!(), "{} failed: {:?}", cmd, output.stderr);
|
||||
warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
|
||||
return Err(ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
format!(
|
||||
@ -1259,7 +1257,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
if !req.guest_hook_path.is_empty() {
|
||||
let _ = s.add_hooks(&req.guest_hook_path).map_err(|e| {
|
||||
error!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"add guest hook {} failed: {:?}", req.guest_hook_path, e
|
||||
);
|
||||
});
|
||||
@ -1278,7 +1276,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
}
|
||||
|
||||
match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone(), None).await {
|
||||
match add_storages(sl(), req.storages.to_vec(), self.sandbox.clone(), None).await {
|
||||
Ok(m) => {
|
||||
let sandbox = self.sandbox.clone();
|
||||
let mut s = sandbox.lock().await;
|
||||
@ -1287,7 +1285,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
};
|
||||
|
||||
match setup_guest_dns(sl!(), req.dns.to_vec()) {
|
||||
match setup_guest_dns(sl(), req.dns.to_vec()) {
|
||||
Ok(_) => {
|
||||
let sandbox = self.sandbox.clone();
|
||||
let mut s = sandbox.lock().await;
|
||||
@ -1412,7 +1410,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "get_guest_details", req);
|
||||
is_allowed(&req)?;
|
||||
|
||||
info!(sl!(), "get guest details!");
|
||||
info!(sl(), "get guest details!");
|
||||
let mut resp = GuestDetailsResponse::new();
|
||||
// to get memory block size
|
||||
match get_memory_info(
|
||||
@ -1426,7 +1424,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
resp.support_mem_hotplug_probe = v;
|
||||
}
|
||||
Err(e) => {
|
||||
info!(sl!(), "fail to get memory info!");
|
||||
info!(sl(), "fail to get memory info!");
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
}
|
||||
@ -1511,7 +1509,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
drop(sandbox);
|
||||
|
||||
if let Some(container_id) = event_rx.recv().await {
|
||||
info!(sl!(), "get_oom_event return {}", &container_id);
|
||||
info!(sl(), "get_oom_event return {}", &container_id);
|
||||
|
||||
let mut resp = OOMEvent::new();
|
||||
resp.container_id = container_id;
|
||||
@ -1530,7 +1528,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "get_volume_stats", req);
|
||||
is_allowed(&req)?;
|
||||
|
||||
info!(sl!(), "get volume stats!");
|
||||
info!(sl(), "get volume stats!");
|
||||
let mut resp = VolumeStatsResponse::new();
|
||||
|
||||
let mut condition = VolumeCondition::new();
|
||||
@ -1541,7 +1539,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
condition.message = String::from("OK");
|
||||
}
|
||||
Err(e) => {
|
||||
info!(sl!(), "failed to open the volume");
|
||||
info!(sl(), "failed to open the volume");
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
};
|
||||
@ -1600,7 +1598,7 @@ impl health_ttrpc::Health for HealthService {
|
||||
_ctx: &TtrpcContext,
|
||||
req: protocols::health::CheckRequest,
|
||||
) -> ttrpc::Result<VersionCheckResponse> {
|
||||
info!(sl!(), "version {:?}", req);
|
||||
info!(sl(), "version {:?}", req);
|
||||
let mut rep = protocols::health::VersionCheckResponse::new();
|
||||
rep.agent_version = AGENT_VERSION.to_string();
|
||||
rep.grpc_version = API_VERSION.to_string();
|
||||
@ -1621,17 +1619,17 @@ fn get_memory_info(
|
||||
match fs::read_to_string(block_size_path) {
|
||||
Ok(v) => {
|
||||
if v.is_empty() {
|
||||
warn!(sl!(), "file {} is empty", block_size_path);
|
||||
warn!(sl(), "file {} is empty", block_size_path);
|
||||
return Err(anyhow!(ERR_INVALID_BLOCK_SIZE));
|
||||
}
|
||||
|
||||
size = u64::from_str_radix(v.trim(), 16).map_err(|_| {
|
||||
warn!(sl!(), "failed to parse the str {} to hex", size);
|
||||
warn!(sl(), "failed to parse the str {} to hex", size);
|
||||
anyhow!(ERR_INVALID_BLOCK_SIZE)
|
||||
})?;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(sl!(), "memory block size error: {:?}", e.kind());
|
||||
warn!(sl(), "memory block size error: {:?}", e.kind());
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
return Err(anyhow!(e));
|
||||
}
|
||||
@ -1643,7 +1641,7 @@ fn get_memory_info(
|
||||
match stat::stat(hotplug_probe_path) {
|
||||
Ok(_) => plug = true,
|
||||
Err(e) => {
|
||||
warn!(sl!(), "hotplug memory error: {:?}", e);
|
||||
warn!(sl(), "hotplug memory error: {:?}", e);
|
||||
match e {
|
||||
nix::Error::ENOENT => plug = false,
|
||||
_ => return Err(anyhow!(e)),
|
||||
@ -1739,7 +1737,7 @@ pub fn start(s: Arc<Mutex<Sandbox>>, server_address: &str, init_mode: bool) -> R
|
||||
.register_service(aservice)
|
||||
.register_service(hservice);
|
||||
|
||||
info!(sl!(), "ttRPC server started"; "address" => server_address);
|
||||
info!(sl(), "ttRPC server started"; "address" => server_address);
|
||||
|
||||
Ok(server)
|
||||
}
|
||||
@ -1814,7 +1812,7 @@ fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> {
|
||||
for m in cmounts.iter() {
|
||||
if let Err(err) = sandbox.unset_and_remove_sandbox_storage(m) {
|
||||
error!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"failed to unset_and_remove_sandbox_storage for container {}, error: {:?}",
|
||||
cid,
|
||||
err
|
||||
@ -1850,7 +1848,7 @@ fn is_signal_handled(proc_status_file: &str, signum: u32) -> bool {
|
||||
return fs::metadata(proc_status_file).is_ok();
|
||||
} else if signum > 64 {
|
||||
// Ensure invalid signum won't break bit shift logic
|
||||
warn!(sl!(), "received invalid signum {}", signum);
|
||||
warn!(sl(), "received invalid signum {}", signum);
|
||||
return false;
|
||||
} else {
|
||||
(signum - 1).into()
|
||||
@ -1860,7 +1858,7 @@ fn is_signal_handled(proc_status_file: &str, signum: u32) -> bool {
|
||||
let file = match File::open(proc_status_file) {
|
||||
Ok(f) => f,
|
||||
Err(_) => {
|
||||
warn!(sl!(), "failed to open file {}", proc_status_file);
|
||||
warn!(sl(), "failed to open file {}", proc_status_file);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
@ -2013,7 +2011,7 @@ pub fn setup_bundle(cid: &str, spec: &mut Spec) -> Result<PathBuf> {
|
||||
"bind",
|
||||
MsFlags::MS_BIND,
|
||||
"",
|
||||
&sl!(),
|
||||
&sl(),
|
||||
)?;
|
||||
|
||||
let rootfs_path_name = rootfs_path
|
||||
@ -2048,7 +2046,7 @@ fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> {
|
||||
}
|
||||
|
||||
info!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"load_kernel_module {}: {:?}", module.name, module.parameters
|
||||
);
|
||||
|
||||
@ -2834,7 +2832,7 @@ OtherField:other
|
||||
for cmd in iptables_cmd_list {
|
||||
if !check_command(cmd) {
|
||||
warn!(
|
||||
sl!(),
|
||||
sl(),
|
||||
"one or more commands for ip tables test are missing, skip it"
|
||||
);
|
||||
return;
|
||||
|
@ -69,7 +69,7 @@ macro_rules! trace_rpc_call {
|
||||
propagator.extract(&extract_carrier_from_ttrpc($ctx))
|
||||
});
|
||||
|
||||
info!(sl!(), "rpc call from shim to agent: {:?}", $name);
|
||||
info!(sl(), "rpc call from shim to agent: {:?}", $name);
|
||||
|
||||
// generate tracing span
|
||||
let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req);
|
||||
|
@ -19,11 +19,9 @@ use tokio::sync::watch::Receiver;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::instrument;
|
||||
|
||||
// Convenience macro to obtain the scope logger
|
||||
macro_rules! sl {
|
||||
() => {
|
||||
slog_scope::logger().new(o!("subsystem" => "uevent"))
|
||||
};
|
||||
// Convenience function to obtain the scope logger.
|
||||
fn sl() -> slog::Logger {
|
||||
slog_scope::logger().new(o!("subsystem" => "uevent"))
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
@ -120,11 +118,11 @@ pub async fn wait_for_uevent(
|
||||
) -> Result<Uevent> {
|
||||
let logprefix = format!("Waiting for {:?}", &matcher);
|
||||
|
||||
info!(sl!(), "{}", logprefix);
|
||||
info!(sl(), "{}", logprefix);
|
||||
let mut sb = sandbox.lock().await;
|
||||
for uev in sb.uevent_map.values() {
|
||||
if matcher.is_match(uev) {
|
||||
info!(sl!(), "{}: found {:?} in uevent map", logprefix, &uev);
|
||||
info!(sl(), "{}: found {:?} in uevent map", logprefix, &uev);
|
||||
return Ok(uev.clone());
|
||||
}
|
||||
}
|
||||
@ -139,7 +137,7 @@ pub async fn wait_for_uevent(
|
||||
sb.uevent_watchers.push(Some((Box::new(matcher), tx)));
|
||||
drop(sb); // unlock
|
||||
|
||||
info!(sl!(), "{}: waiting on channel", logprefix);
|
||||
info!(sl(), "{}: waiting on channel", logprefix);
|
||||
|
||||
let hotplug_timeout = AGENT_CONFIG.hotplug_timeout;
|
||||
|
||||
@ -157,7 +155,7 @@ pub async fn wait_for_uevent(
|
||||
}
|
||||
};
|
||||
|
||||
info!(sl!(), "{}: found {:?} on channel", logprefix, &uev);
|
||||
info!(sl(), "{}: found {:?} on channel", logprefix, &uev);
|
||||
Ok(uev)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user