From f680fc52bed87a3dfdb31c857108199fae9a8f82 Mon Sep 17 00:00:00 2001 From: Wedson Almeida Filho Date: Wed, 12 Oct 2022 22:03:11 +0100 Subject: [PATCH 1/4] agent: change `AGENT_CONFIG`'s lazy type to just `AgentConfig` Since it is never modified, it doesn't really need a lock of any kind. Removing the `RwLock` wrapper allows us to remove all `.read().await` calls when accessing it. Additionally, `AGENT_CONFIG` already has a static lifetime, so there is no need to wrap it in a ref-counted heap allocation. Fixes: #5409 Signed-off-by: Wedson Almeida Filho --- src/agent/src/main.rs | 15 +++++++-------- src/agent/src/rpc.rs | 10 +++------- src/agent/src/uevent.rs | 2 +- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index f82904804c..58c406fe17 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -65,7 +65,7 @@ use tokio::{ io::AsyncWrite, sync::{ watch::{channel, Receiver}, - Mutex, RwLock, + Mutex, }, task::JoinHandle, }; @@ -83,12 +83,11 @@ cfg_if! { const NAME: &str = "kata-agent"; lazy_static! { - static ref AGENT_CONFIG: Arc> = Arc::new(RwLock::new( + static ref AGENT_CONFIG: AgentConfig = // Note: We can't do AgentOpts.parse() here to send through the processed arguments to AgentConfig // clap::Parser::parse() greedily process all command line input including cargo test parameters, // so should only be used inside main. - AgentConfig::from_cmdline("/proc/cmdline", env::args().collect()).unwrap() - )); + AgentConfig::from_cmdline("/proc/cmdline", env::args().collect()).unwrap(); } #[derive(Parser)] @@ -181,13 +180,13 @@ async fn real_main() -> std::result::Result<(), Box> { lazy_static::initialize(&AGENT_CONFIG); - init_agent_as_init(&logger, AGENT_CONFIG.read().await.unified_cgroup_hierarchy)?; + init_agent_as_init(&logger, AGENT_CONFIG.unified_cgroup_hierarchy)?; drop(logger_async_guard); } else { lazy_static::initialize(&AGENT_CONFIG); } - let config = AGENT_CONFIG.read().await; + let config = &AGENT_CONFIG; let log_vport = config.log_vport as u32; let log_handle = tokio::spawn(create_logger_task(rfd, log_vport, shutdown_rx.clone())); @@ -200,7 +199,7 @@ async fn real_main() -> std::result::Result<(), Box> { let (logger, logger_async_guard) = logging::create_logger(NAME, "agent", config.log_level, writer); - announce(&logger, &config); + announce(&logger, config); // This variable is required as it enables the global (and crucially static) logger, // which is required to satisfy the the lifetime constraints of the auto-generated gRPC code. @@ -228,7 +227,7 @@ async fn real_main() -> std::result::Result<(), Box> { let span_guard = root_span.enter(); // Start the sandbox and wait for its ttRPC server to end - start_sandbox(&logger, &config, init_mode, &mut tasks, shutdown_rx.clone()).await?; + start_sandbox(&logger, config, init_mode, &mut tasks, shutdown_rx.clone()).await?; // Install a NOP logger for the remainder of the shutdown sequence // to ensure any log calls made by local crates using the scope logger diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index dede3204c8..ed7729855b 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -126,11 +126,7 @@ macro_rules! ttrpc_error { macro_rules! is_allowed { ($req:ident) => { - if !AGENT_CONFIG - .read() - .await - .is_allowed_endpoint($req.descriptor_dyn().name()) - { + if !AGENT_CONFIG.is_allowed_endpoint($req.descriptor_dyn().name()) { return Err(ttrpc_error!( ttrpc::Code::UNIMPLEMENTED, format!("{} is blocked", $req.descriptor_dyn().name()), @@ -240,7 +236,7 @@ impl AgentService { let mut ctr: LinuxContainer = LinuxContainer::new(cid.as_str(), CONTAINER_BASE, opts, &sl!())?; - let pipe_size = AGENT_CONFIG.read().await.container_pipe_size; + let pipe_size = AGENT_CONFIG.container_pipe_size; let p = if let Some(p) = oci.process { Process::new(&sl!(), &p, cid.as_str(), true, pipe_size)? @@ -374,7 +370,7 @@ impl AgentService { // Apply any necessary corrections for PCI addresses update_env_pci(&mut process.Env, &sandbox.pcimap)?; - let pipe_size = AGENT_CONFIG.read().await.container_pipe_size; + let pipe_size = AGENT_CONFIG.container_pipe_size; let ocip = rustjail::process_grpc_to_oci(&process); let p = Process::new(&sl!(), &ocip, exec_id.as_str(), false, pipe_size)?; diff --git a/src/agent/src/uevent.rs b/src/agent/src/uevent.rs index 5d1f554940..cabdd6235c 100644 --- a/src/agent/src/uevent.rs +++ b/src/agent/src/uevent.rs @@ -141,7 +141,7 @@ pub async fn wait_for_uevent( info!(sl!(), "{}: waiting on channel", logprefix); - let hotplug_timeout = AGENT_CONFIG.read().await.hotplug_timeout; + let hotplug_timeout = AGENT_CONFIG.hotplug_timeout; let uev = match tokio::time::timeout(hotplug_timeout, rx).await { Ok(v) => v?, From 0e5d6ce6d7eb3cbff0c449e74710905ab30e935f Mon Sep 17 00:00:00 2001 From: Wedson Almeida Filho Date: Wed, 28 Jun 2023 13:04:31 -0300 Subject: [PATCH 2/4] agent: convert the `is_allowed` macro to a function Having a function allows for better error messages from the type checker and it makes it clearer to callers what can happen. For example: is_allowed!(req); Gives no indication that it may result in an early return, and no simple way for callers to modify the behaviour. It also makes it look like ownership of `req` is being transferred. On the other hand, is_allowed(&req)?; Indicates that `req` is being borrowed (immutably) and may fail. The question mark indicates that the caller wants an early return on failure. Fixes: #7201 Signed-off-by: Wedson Almeida Filho --- src/agent/src/rpc.rs | 90 ++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index ed7729855b..e9081c8df9 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -124,15 +124,15 @@ macro_rules! ttrpc_error { }; } -macro_rules! is_allowed { - ($req:ident) => { - if !AGENT_CONFIG.is_allowed_endpoint($req.descriptor_dyn().name()) { - return Err(ttrpc_error!( - ttrpc::Code::UNIMPLEMENTED, - format!("{} is blocked", $req.descriptor_dyn().name()), - )); - } - }; +fn is_allowed(req: &impl MessageDyn) -> ttrpc::Result<()> { + if !AGENT_CONFIG.is_allowed_endpoint(req.descriptor_dyn().name()) { + Err(ttrpc_error!( + ttrpc::Code::UNIMPLEMENTED, + format!("{} is blocked", req.descriptor_dyn().name()), + )) + } else { + Ok(()) + } } #[derive(Clone, Debug)] @@ -650,7 +650,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CreateContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "create_container", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_create_container(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -663,7 +663,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::StartContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "start_container", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_start_container(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -676,7 +676,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::RemoveContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_container", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_remove_container(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), @@ -690,7 +690,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ExecProcessRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "exec_process", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_exec_process(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -703,7 +703,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::SignalProcessRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "signal_process", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_signal_process(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -716,7 +716,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::WaitProcessRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "wait_process", req); - is_allowed!(req); + is_allowed(&req)?; self.do_wait_process(req) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -728,7 +728,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id.clone(); let res = req.resources; @@ -764,7 +764,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::StatsContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "stats_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id; let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -786,7 +786,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::PauseContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "pause_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -810,7 +810,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ResumeContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "resume_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -834,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::RemoveStaleVirtiofsShareMountsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req); - is_allowed!(req); + is_allowed(&req)?; let mount_infos = parse_mount_table("/proc/self/mountinfo") .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; for m in &mount_infos { @@ -856,7 +856,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::WriteStreamRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; self.do_write_stream(req) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -867,7 +867,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; self.do_read_stream(req, true) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -878,7 +878,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; self.do_read_stream(req, false) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -890,7 +890,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "close_stdin", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id.clone(); let eid = req.exec_id; @@ -917,7 +917,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::TtyWinResizeRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "tty_win_resize", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id.clone(); let eid = req.exec_id.clone(); @@ -959,7 +959,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_interface", req); - is_allowed!(req); + is_allowed(&req)?; let interface = req.interface.into_option().ok_or_else(|| { ttrpc_error!( @@ -987,7 +987,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateRoutesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_routes", req); - is_allowed!(req); + is_allowed(&req)?; let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| { ttrpc_error!( @@ -1024,7 +1024,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateEphemeralMountsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_mounts", req); - is_allowed!(req); + is_allowed(&req)?; match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await { Ok(_) => Ok(Empty::new()), @@ -1041,7 +1041,7 @@ impl agent_ttrpc::AgentService for AgentService { req: GetIPTablesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_iptables", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "get_ip_tables: request received"); @@ -1080,7 +1080,7 @@ impl agent_ttrpc::AgentService for AgentService { req: SetIPTablesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "set_iptables", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "set_ip_tables request received"); @@ -1195,7 +1195,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ListInterfacesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "list_interfaces", req); - is_allowed!(req); + is_allowed(&req)?; let list = self .sandbox @@ -1223,7 +1223,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ListRoutesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "list_routes", req); - is_allowed!(req); + is_allowed(&req)?; let list = self .sandbox @@ -1246,7 +1246,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CreateSandboxRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "create_sandbox", req); - is_allowed!(req); + is_allowed(&req)?; { let sandbox = self.sandbox.clone(); @@ -1311,7 +1311,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::DestroySandboxRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "destroy_sandbox", req); - is_allowed!(req); + is_allowed(&req)?; let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -1346,7 +1346,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "add_arp_neighbors", req); - is_allowed!(req); + is_allowed(&req)?; let neighs = req .neighbors @@ -1380,7 +1380,7 @@ impl agent_ttrpc::AgentService for AgentService { ctx: &TtrpcContext, req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; let s = Arc::clone(&self.sandbox); let sandbox = s.lock().await; trace_rpc_call!(ctx, "online_cpu_mem", req); @@ -1398,7 +1398,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ReseedRandomDevRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "reseed_random_dev", req); - is_allowed!(req); + is_allowed(&req)?; random::reseed_rng(req.data.as_slice()) .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1412,7 +1412,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::GuestDetailsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_guest_details", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "get guest details!"); let mut resp = GuestDetailsResponse::new(); @@ -1446,7 +1446,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::MemHotplugByProbeRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "mem_hotplug_by_probe", req); - is_allowed!(req); + is_allowed(&req)?; do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1460,7 +1460,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::SetGuestDateTimeRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "set_guest_date_time", req); - is_allowed!(req); + is_allowed(&req)?; do_set_guest_date_time(req.Sec, req.Usec) .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1474,7 +1474,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CopyFileRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "copy_file", req); - is_allowed!(req); + is_allowed(&req)?; do_copy_file(&req).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1487,7 +1487,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::GetMetricsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_metrics", req); - is_allowed!(req); + is_allowed(&req)?; match get_metrics(&req) { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), @@ -1504,7 +1504,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::GetOOMEventRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; let sandbox = self.sandbox.clone(); let s = sandbox.lock().await; let event_rx = &s.event_rx.clone(); @@ -1530,7 +1530,7 @@ impl agent_ttrpc::AgentService for AgentService { req: VolumeStatsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_volume_stats", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "get volume stats!"); let mut resp = VolumeStatsResponse::new(); @@ -1571,7 +1571,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::AddSwapRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "add_swap", req); - is_allowed!(req); + is_allowed(&req)?; do_add_swap(&self.sandbox, &req) .await From 0860fbd410341ec82ad89a072091107700fb03fb Mon Sep 17 00:00:00 2001 From: Wedson Almeida Filho Date: Wed, 28 Jun 2023 13:21:00 -0300 Subject: [PATCH 3/4] agent: convert the `ttrpc_error` macro to a function There is nothing in it that requires it to be a macro. Converting it to a function allows for better error messages. Fixes: #7201 Signed-off-by: Wedson Almeida Filho --- src/agent/src/rpc.rs | 134 +++++++++++++++++++++---------------------- 1 file changed, 66 insertions(+), 68 deletions(-) diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index e9081c8df9..ef2c0f9e7f 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -117,16 +117,14 @@ macro_rules! sl { }; } -// Convenience macro to wrap an error and response to ttrpc client -macro_rules! ttrpc_error { - ($code:path, $err:expr $(,)?) => { - get_rpc_status($code, format!("{:?}", $err)) - }; +// Convenience function to wrap an error and response to ttrpc client +fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error { + get_rpc_status(code, format!("{:?}", err)) } fn is_allowed(req: &impl MessageDyn) -> ttrpc::Result<()> { if !AGENT_CONFIG.is_allowed_endpoint(req.descriptor_dyn().name()) { - Err(ttrpc_error!( + Err(ttrpc_error( ttrpc::Code::UNIMPLEMENTED, format!("{} is blocked", req.descriptor_dyn().name()), )) @@ -652,7 +650,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "create_container", req); is_allowed(&req)?; match self.do_create_container(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -665,7 +663,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "start_container", req); is_allowed(&req)?; match self.do_start_container(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -679,7 +677,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; match self.do_remove_container(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -692,7 +690,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "exec_process", req); is_allowed(&req)?; match self.do_exec_process(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -705,7 +703,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "signal_process", req); is_allowed(&req)?; match self.do_signal_process(req).await { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), } } @@ -719,7 +717,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_wait_process(req) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn update_container( @@ -736,7 +734,7 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) @@ -748,7 +746,7 @@ impl agent_ttrpc::AgentService for AgentService { let oci_res = rustjail::resources_grpc_to_oci(res); match ctr.set(oci_res) { Err(e) => { - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } Ok(_) => return Ok(resp), @@ -770,14 +768,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(&cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.stats() - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn pause_container( @@ -792,14 +790,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.pause() - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -816,14 +814,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = s.lock().await; let ctr = sandbox.get_container(cid).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; ctr.resume() - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -836,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req); is_allowed(&req)?; let mount_infos = parse_mount_table("/proc/self/mountinfo") - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; for m in &mount_infos { if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) { // stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more. @@ -859,7 +857,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_write_stream(req) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn read_stdout( @@ -870,7 +868,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_read_stream(req, true) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn read_stderr( @@ -881,7 +879,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; self.do_read_stream(req, false) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e)) } async fn close_stdin( @@ -900,7 +898,7 @@ impl agent_ttrpc::AgentService for AgentService { let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, format!("invalid argument: {:?}", e), ) @@ -926,7 +924,7 @@ impl agent_ttrpc::AgentService for AgentService { let p = sandbox .find_container_process(cid.as_str(), eid.as_str()) .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::UNAVAILABLE, format!("invalid argument: {:?}", e), ) @@ -943,11 +941,11 @@ impl agent_ttrpc::AgentService for AgentService { let err = libc::ioctl(fd, TIOCSWINSZ, &win); Errno::result(err).map(drop).map_err(|e| { - ttrpc_error!(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) + ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)) })?; } } else { - return Err(ttrpc_error!(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); + return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); } Ok(Empty::new()) @@ -962,7 +960,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; let interface = req.interface.into_option().ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty update interface request".to_string(), ) @@ -975,7 +973,7 @@ impl agent_ttrpc::AgentService for AgentService { .update_interface(&interface) .await .map_err(|e| { - ttrpc_error!(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) + ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) })?; Ok(interface) @@ -990,7 +988,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty update routes request".to_string(), ) @@ -999,14 +997,14 @@ impl agent_ttrpc::AgentService for AgentService { let mut sandbox = self.sandbox.lock().await; sandbox.rtnl.update_routes(new_routes).await.map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to update routes: {:?}", e), ) })?; let list = sandbox.rtnl.list_routes().await.map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to list routes after update: {:?}", e), ) @@ -1028,7 +1026,7 @@ impl agent_ttrpc::AgentService for AgentService { match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await { Ok(_) => Ok(Empty::new()), - Err(e) => Err(ttrpc_error!( + Err(e) => Err(ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to update mounts: {:?}", e), )), @@ -1069,7 +1067,7 @@ impl agent_ttrpc::AgentService for AgentService { }), Err(e) => { warn!(sl!(), "failed to run {}: {:?}", cmd, e.kind()); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } } @@ -1112,7 +1110,7 @@ impl agent_ttrpc::AgentService for AgentService { Ok(child) => child, Err(e) => { warn!(sl!(), "failure to spawn {}: {:?}", cmd, e.kind()); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; @@ -1120,9 +1118,9 @@ impl agent_ttrpc::AgentService for AgentService { Some(si) => si, None => { println!("failed to get stdin from child"); - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "failed to take stdin from child".to_string() + "failed to take stdin from child".to_string(), )); } }; @@ -1145,16 +1143,16 @@ impl agent_ttrpc::AgentService for AgentService { .await .is_err() { - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "timeout waiting for stdin writer to complete".to_string() + "timeout waiting for stdin writer to complete".to_string(), )); } if handle.await.is_err() { - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, - "stdin writer thread failure".to_string() + "stdin writer thread failure".to_string(), )); } @@ -1167,19 +1165,19 @@ impl agent_ttrpc::AgentService for AgentService { cmd, e.kind() ); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; if !output.status.success() { warn!(sl!(), "{} failed: {:?}", cmd, output.stderr); - return Err(ttrpc_error!( + return Err(ttrpc_error( ttrpc::Code::INTERNAL, format!( "{} failed: {:?}", cmd, String::from_utf8_lossy(&output.stderr) - ) + ), )); } @@ -1205,7 +1203,7 @@ impl agent_ttrpc::AgentService for AgentService { .list_interfaces() .await .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to list interfaces: {:?}", e), ) @@ -1232,7 +1230,7 @@ impl agent_ttrpc::AgentService for AgentService { .rtnl .list_routes() .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; Ok(protocols::agent::Routes { Routes: list, @@ -1272,12 +1270,12 @@ impl agent_ttrpc::AgentService for AgentService { } for m in req.kernel_modules.iter() { - load_kernel_module(m).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } s.setup_shared_namespaces() .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone(), None).await { @@ -1286,7 +1284,7 @@ impl agent_ttrpc::AgentService for AgentService { let mut s = sandbox.lock().await; s.mounts = m } - Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }; match setup_guest_dns(sl!(), req.dns.to_vec()) { @@ -1299,7 +1297,7 @@ impl agent_ttrpc::AgentService for AgentService { .iter() .map(|dns| s.network.set_dns(dns.to_string())); } - Err(e) => return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }; Ok(Empty::new()) @@ -1320,7 +1318,7 @@ impl agent_ttrpc::AgentService for AgentService { sandbox .destroy() .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; // Close get_oom_event connection, // otherwise it will block the shutdown of ttrpc. sandbox.event_tx.take(); @@ -1329,13 +1327,13 @@ impl agent_ttrpc::AgentService for AgentService { .sender .take() .ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, "failed to get sandbox sender channel".to_string(), ) })? .send(1) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1353,7 +1351,7 @@ impl agent_ttrpc::AgentService for AgentService { .into_option() .map(|n| n.ARPNeighbors) .ok_or_else(|| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty add arp neighbours request".to_string(), ) @@ -1366,7 +1364,7 @@ impl agent_ttrpc::AgentService for AgentService { .add_arp_neighbors(neighs) .await .map_err(|e| { - ttrpc_error!( + ttrpc_error( ttrpc::Code::INTERNAL, format!("Failed to add ARP neighbours: {:?}", e), ) @@ -1387,7 +1385,7 @@ impl agent_ttrpc::AgentService for AgentService { sandbox .online_cpu_memory(&req) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1401,7 +1399,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; random::reseed_rng(req.data.as_slice()) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1429,7 +1427,7 @@ impl agent_ttrpc::AgentService for AgentService { } Err(e) => { info!(sl!(), "fail to get memory info!"); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } @@ -1449,7 +1447,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1463,7 +1461,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; do_set_guest_date_time(req.Sec, req.Usec) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1476,7 +1474,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "copy_file", req); is_allowed(&req)?; - do_copy_file(&req).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } @@ -1490,7 +1488,7 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; match get_metrics(&req) { - Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(s) => { let mut metrics = Metrics::new(); metrics.set_metrics(s); @@ -1521,7 +1519,7 @@ impl agent_ttrpc::AgentService for AgentService { return Ok(resp); } - Err(ttrpc_error!(ttrpc::Code::INTERNAL, "")) + Err(ttrpc_error(ttrpc::Code::INTERNAL, "")) } async fn get_volume_stats( @@ -1544,7 +1542,7 @@ impl agent_ttrpc::AgentService for AgentService { } Err(e) => { info!(sl!(), "failed to open the volume"); - return Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; @@ -1553,12 +1551,12 @@ impl agent_ttrpc::AgentService for AgentService { // to get volume capacity stats get_volume_capacity_stats(&req.volume_guest_path) .map(|u| usage_vec.push(u)) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; // to get volume inode stats get_volume_inode_stats(&req.volume_guest_path) .map(|u| usage_vec.push(u)) - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; resp.usage = usage_vec; resp.volume_condition = MessageField::some(condition); @@ -1575,7 +1573,7 @@ impl agent_ttrpc::AgentService for AgentService { do_add_swap(&self.sandbox, &req) .await - .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; Ok(Empty::new()) } From 0504bd725420c6a8a0769bea963758d366e0c0f2 Mon Sep 17 00:00:00 2001 From: Wedson Almeida Filho Date: Wed, 28 Jun 2023 13:28:18 -0300 Subject: [PATCH 4/4] agent: convert the `sl` macros to functions There is nothing in them that requires them to be macros. Converting them to functions allows for better error messages. Fixes: #7201 Signed-off-by: Wedson Almeida Filho --- src/agent/rustjail/src/cgroups/fs/mod.rs | 46 ++++----- src/agent/rustjail/src/cgroups/notifier.rs | 30 +++--- src/agent/rustjail/src/container.rs | 12 +-- src/agent/src/device.rs | 16 ++- src/agent/src/metrics.rs | 28 +++-- src/agent/src/rpc.rs | 114 ++++++++++----------- src/agent/src/tracer.rs | 2 +- src/agent/src/uevent.rs | 16 ++- 8 files changed, 125 insertions(+), 139 deletions(-) diff --git a/src/agent/rustjail/src/cgroups/fs/mod.rs b/src/agent/rustjail/src/cgroups/fs/mod.rs index fc023ac61a..6145f5f9c0 100644 --- a/src/agent/rustjail/src/cgroups/fs/mod.rs +++ b/src/agent/rustjail/src/cgroups/fs/mod.rs @@ -39,11 +39,9 @@ use std::path::Path; const GUEST_CPUS_PATH: &str = "/sys/devices/system/cpu/online"; -// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger().new(o!("subsystem" => "cgroups")) - }; +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "cgroups")) } macro_rules! get_controller_or_return_singular_none { @@ -82,7 +80,7 @@ impl CgroupManager for Manager { fn set(&self, r: &LinuxResources, update: bool) -> Result<()> { info!( - sl!(), + sl(), "cgroup manager set resources for container. Resources input {:?}", r ); @@ -120,7 +118,7 @@ impl CgroupManager for Manager { // set devices resources set_devices_resources(&self.cgroup, &r.devices, res); - info!(sl!(), "resources after processed {:?}", res); + info!(sl(), "resources after processed {:?}", res); // apply resources self.cgroup.apply(res)?; @@ -197,7 +195,7 @@ impl CgroupManager for Manager { if guest_cpuset.is_empty() { return Ok(()); } - info!(sl!(), "update_cpuset_path to: {}", guest_cpuset); + info!(sl(), "update_cpuset_path to: {}", guest_cpuset); let h = cgroups::hierarchies::auto(); let root_cg = h.root_control_group(); @@ -205,12 +203,12 @@ impl CgroupManager for Manager { let root_cpuset_controller: &CpuSetController = root_cg.controller_of().unwrap(); let path = root_cpuset_controller.path(); let root_path = Path::new(path); - info!(sl!(), "root cpuset path: {:?}", &path); + info!(sl(), "root cpuset path: {:?}", &path); let container_cpuset_controller: &CpuSetController = self.cgroup.controller_of().unwrap(); let path = container_cpuset_controller.path(); let container_path = Path::new(path); - info!(sl!(), "container cpuset path: {:?}", &path); + info!(sl(), "container cpuset path: {:?}", &path); let mut paths = vec![]; for ancestor in container_path.ancestors() { @@ -219,7 +217,7 @@ impl CgroupManager for Manager { } paths.push(ancestor); } - info!(sl!(), "parent paths to update cpuset: {:?}", &paths); + info!(sl(), "parent paths to update cpuset: {:?}", &paths); let mut i = paths.len(); loop { @@ -233,7 +231,7 @@ impl CgroupManager for Manager { .to_str() .unwrap() .trim_start_matches(root_path.to_str().unwrap()); - info!(sl!(), "updating cpuset for parent path {:?}", &r_path); + info!(sl(), "updating cpuset for parent path {:?}", &r_path); let cg = new_cgroup(cgroups::hierarchies::auto(), r_path)?; let cpuset_controller: &CpuSetController = cg.controller_of().unwrap(); cpuset_controller.set_cpus(guest_cpuset)?; @@ -241,7 +239,7 @@ impl CgroupManager for Manager { if !container_cpuset.is_empty() { info!( - sl!(), + sl(), "updating cpuset for container path: {:?} cpuset: {}", &container_path, container_cpuset @@ -276,7 +274,7 @@ fn set_network_resources( network: &LinuxNetwork, res: &mut cgroups::Resources, ) { - info!(sl!(), "cgroup manager set network"); + info!(sl(), "cgroup manager set network"); // set classid // description can be found at https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v1/net_cls.html @@ -303,7 +301,7 @@ fn set_devices_resources( device_resources: &[LinuxDeviceCgroup], res: &mut cgroups::Resources, ) { - info!(sl!(), "cgroup manager set devices"); + info!(sl(), "cgroup manager set devices"); let mut devices = vec![]; for d in device_resources.iter() { @@ -332,7 +330,7 @@ fn set_hugepages_resources( hugepage_limits: &[LinuxHugepageLimit], res: &mut cgroups::Resources, ) { - info!(sl!(), "cgroup manager set hugepage"); + info!(sl(), "cgroup manager set hugepage"); let mut limits = vec![]; let hugetlb_controller = cg.controller_of::(); @@ -346,7 +344,7 @@ fn set_hugepages_resources( limits.push(hr); } else { warn!( - sl!(), + sl(), "{} page size support cannot be verified, dropping requested limit", l.page_size ); } @@ -359,7 +357,7 @@ fn set_block_io_resources( blkio: &LinuxBlockIo, res: &mut cgroups::Resources, ) { - info!(sl!(), "cgroup manager set block io"); + info!(sl(), "cgroup manager set block io"); res.blkio.weight = blkio.weight; res.blkio.leaf_weight = blkio.leaf_weight; @@ -387,13 +385,13 @@ fn set_block_io_resources( } fn set_cpu_resources(cg: &cgroups::Cgroup, cpu: &LinuxCpu) -> Result<()> { - info!(sl!(), "cgroup manager set cpu"); + info!(sl(), "cgroup manager set cpu"); let cpuset_controller: &CpuSetController = cg.controller_of().unwrap(); if !cpu.cpus.is_empty() { if let Err(e) = cpuset_controller.set_cpus(&cpu.cpus) { - warn!(sl!(), "write cpuset failed: {:?}", e); + warn!(sl(), "write cpuset failed: {:?}", e); } } @@ -424,7 +422,7 @@ fn set_cpu_resources(cg: &cgroups::Cgroup, cpu: &LinuxCpu) -> Result<()> { } fn set_memory_resources(cg: &cgroups::Cgroup, memory: &LinuxMemory, update: bool) -> Result<()> { - info!(sl!(), "cgroup manager set memory"); + info!(sl(), "cgroup manager set memory"); let mem_controller: &MemController = cg.controller_of().unwrap(); if !update { @@ -493,7 +491,7 @@ fn set_memory_resources(cg: &cgroups::Cgroup, memory: &LinuxMemory, update: bool } fn set_pids_resources(cg: &cgroups::Cgroup, pids: &LinuxPids) -> Result<()> { - info!(sl!(), "cgroup manager set pids"); + info!(sl(), "cgroup manager set pids"); let pid_controller: &PidController = cg.controller_of().unwrap(); let v = if pids.limit > 0 { MaxValue::Value(pids.limit) @@ -962,7 +960,7 @@ pub fn get_paths() -> Result> { for l in fs::read_to_string(PATHS)?.lines() { let fl: Vec<&str> = l.split(':').collect(); if fl.len() != 3 { - info!(sl!(), "Corrupted cgroup data!"); + info!(sl(), "Corrupted cgroup data!"); continue; } @@ -983,7 +981,7 @@ pub fn get_mounts(paths: &HashMap) -> Result = p[1].split(' ').collect(); if post.len() != 3 { - warn!(sl!(), "can't parse {} line {:?}", MOUNTS, l); + warn!(sl(), "can't parse {} line {:?}", MOUNTS, l); continue; } diff --git a/src/agent/rustjail/src/cgroups/notifier.rs b/src/agent/rustjail/src/cgroups/notifier.rs index 9f91b3584f..5260a3d3f2 100644 --- a/src/agent/rustjail/src/cgroups/notifier.rs +++ b/src/agent/rustjail/src/cgroups/notifier.rs @@ -16,11 +16,9 @@ use inotify::{Inotify, WatchMask}; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::{channel, Receiver}; -// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger().new(o!("subsystem" => "cgroups_notifier")) - }; +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "cgroups_notifier")) } pub async fn notify_oom(cid: &str, cg_dir: String) -> Result> { @@ -38,7 +36,7 @@ pub async fn notify_oom(cid: &str, cg_dir: String) -> Result> { fn get_value_from_cgroup(path: &Path, key: &str) -> Result { let content = fs::read_to_string(path)?; info!( - sl!(), + sl(), "get_value_from_cgroup file: {:?}, content: {}", &path, &content ); @@ -67,11 +65,11 @@ async fn register_memory_event_v2( let event_control_path = Path::new(&cg_dir).join(memory_event_name); let cgroup_event_control_path = Path::new(&cg_dir).join(cgroup_event_name); info!( - sl!(), + sl(), "register_memory_event_v2 event_control_path: {:?}", &event_control_path ); info!( - sl!(), + sl(), "register_memory_event_v2 cgroup_event_control_path: {:?}", &cgroup_event_control_path ); @@ -82,8 +80,8 @@ async fn register_memory_event_v2( // Because no `unix.IN_DELETE|unix.IN_DELETE_SELF` event for cgroup file system, so watching all process exited let cg_wd = inotify.add_watch(&cgroup_event_control_path, WatchMask::MODIFY)?; - info!(sl!(), "ev_wd: {:?}", ev_wd); - info!(sl!(), "cg_wd: {:?}", cg_wd); + info!(sl(), "ev_wd: {:?}", ev_wd); + info!(sl(), "cg_wd: {:?}", cg_wd); let (sender, receiver) = channel(100); let containere_id = containere_id.to_string(); @@ -97,17 +95,17 @@ async fn register_memory_event_v2( while let Some(event_or_error) = stream.next().await { let event = event_or_error.unwrap(); info!( - sl!(), + sl(), "container[{}] get event for container: {:?}", &containere_id, &event ); // info!("is1: {}", event.wd == wd1); - info!(sl!(), "event.wd: {:?}", event.wd); + info!(sl(), "event.wd: {:?}", event.wd); if event.wd == ev_wd { let oom = get_value_from_cgroup(&event_control_path, "oom_kill"); if oom.unwrap_or(0) > 0 { let _ = sender.send(containere_id.clone()).await.map_err(|e| { - error!(sl!(), "send containere_id failed, error: {:?}", e); + error!(sl(), "send containere_id failed, error: {:?}", e); }); return; } @@ -171,13 +169,13 @@ async fn register_memory_event( let mut buf = [0u8; 8]; match eventfd_stream.read(&mut buf).await { Err(err) => { - warn!(sl!(), "failed to read from eventfd: {:?}", err); + warn!(sl(), "failed to read from eventfd: {:?}", err); return; } Ok(_) => { let content = fs::read_to_string(path.clone()); info!( - sl!(), + sl(), "cgroup event for container: {}, path: {:?}, content: {:?}", &containere_id, &path, @@ -193,7 +191,7 @@ async fn register_memory_event( } let _ = sender.send(containere_id.clone()).await.map_err(|e| { - error!(sl!(), "send containere_id failed, error: {:?}", e); + error!(sl(), "send containere_id failed, error: {:?}", e); }); } }); diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index b1d7499cd9..2964ae3776 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -1596,10 +1596,8 @@ mod tests { use tempfile::tempdir; use test_utils::skip_if_not_root; - macro_rules! sl { - () => { - slog_scope::logger() - }; + fn sl() -> slog::Logger { + slog_scope::logger() } #[test] @@ -1854,7 +1852,7 @@ mod tests { let _ = new_linux_container_and_then(|mut c: LinuxContainer| { c.processes.insert( 1, - Process::new(&sl!(), &oci::Process::default(), "123", true, 1).unwrap(), + Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap(), ); let p = c.get_process("123"); assert!(p.is_ok(), "Expecting Ok, Got {:?}", p); @@ -1881,7 +1879,7 @@ mod tests { let (c, _dir) = new_linux_container(); let ret = c .unwrap() - .start(Process::new(&sl!(), &oci::Process::default(), "123", true, 1).unwrap()) + .start(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap()) .await; assert!(ret.is_err(), "Expecting Err, Got {:?}", ret); } @@ -1891,7 +1889,7 @@ mod tests { let (c, _dir) = new_linux_container(); let ret = c .unwrap() - .run(Process::new(&sl!(), &oci::Process::default(), "123", true, 1).unwrap()) + .run(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap()) .await; assert!(ret.is_err(), "Expecting Err, Got {:?}", ret); } diff --git a/src/agent/src/device.rs b/src/agent/src/device.rs index 53fe77b1b4..0268f8c348 100644 --- a/src/agent/src/device.rs +++ b/src/agent/src/device.rs @@ -26,11 +26,9 @@ use oci::{LinuxDeviceCgroup, LinuxResources, Spec}; use protocols::agent::Device; use tracing::instrument; -// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger().new(o!("subsystem" => "device")) - }; +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "device")) } const VM_ROOTFS: &str = "/"; @@ -78,7 +76,7 @@ where { let syspci = Path::new(&syspci); let drv = drv.as_ref(); - info!(sl!(), "rebind_pci_driver: {} => {:?}", dev, drv); + info!(sl(), "rebind_pci_driver: {} => {:?}", dev, drv); let devpath = syspci.join("devices").join(dev.to_string()); let overridepath = &devpath.join("driver_override"); @@ -606,7 +604,7 @@ fn update_spec_devices(spec: &mut Spec, mut updates: HashMap<&str, DevUpdate>) - let host_minor = specdev.minor; info!( - sl!(), + sl(), "update_spec_devices() updating device"; "container_path" => &specdev.path, "type" => &specdev.r#type, @@ -657,7 +655,7 @@ fn update_spec_devices(spec: &mut Spec, mut updates: HashMap<&str, DevUpdate>) - if let Some(update) = res_updates.get(&(r.r#type.as_str(), host_major, host_minor)) { info!( - sl!(), + sl(), "update_spec_devices() updating resource"; "type" => &r.r#type, "host_major" => host_major, @@ -921,7 +919,7 @@ pub async fn add_devices( #[instrument] async fn add_device(device: &Device, sandbox: &Arc>) -> Result { // log before validation to help with debugging gRPC protocol version differences. - info!(sl!(), "device-id: {}, device-type: {}, device-vm-path: {}, device-container-path: {}, device-options: {:?}", + info!(sl(), "device-id: {}, device-type: {}, device-vm-path: {}, device-container-path: {}, device-options: {:?}", device.id, device.type_, device.vm_path, device.container_path, device.options); if device.type_.is_empty() { diff --git a/src/agent/src/metrics.rs b/src/agent/src/metrics.rs index a5522c0eba..d7fc4d12bd 100644 --- a/src/agent/src/metrics.rs +++ b/src/agent/src/metrics.rs @@ -15,11 +15,9 @@ use tracing::instrument; const NAMESPACE_KATA_AGENT: &str = "kata_agent"; const NAMESPACE_KATA_GUEST: &str = "kata_guest"; -// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger().new(o!("subsystem" => "metrics")) - }; +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "metrics")) } lazy_static! { @@ -139,7 +137,7 @@ fn update_agent_metrics() -> Result<()> { Ok(p) => p, Err(e) => { // FIXME: return Ok for all errors? - warn!(sl!(), "failed to create process instance: {:?}", e); + warn!(sl(), "failed to create process instance: {:?}", e); return Ok(()); } @@ -160,7 +158,7 @@ fn update_agent_metrics() -> Result<()> { // io match me.io() { Err(err) => { - info!(sl!(), "failed to get process io stat: {:?}", err); + info!(sl(), "failed to get process io stat: {:?}", err); } Ok(io) => { set_gauge_vec_proc_io(&AGENT_IO_STAT, &io); @@ -169,7 +167,7 @@ fn update_agent_metrics() -> Result<()> { match me.stat() { Err(err) => { - info!(sl!(), "failed to get process stat: {:?}", err); + info!(sl(), "failed to get process stat: {:?}", err); } Ok(stat) => { set_gauge_vec_proc_stat(&AGENT_PROC_STAT, &stat); @@ -177,7 +175,7 @@ fn update_agent_metrics() -> Result<()> { } match me.status() { - Err(err) => error!(sl!(), "failed to get process status: {:?}", err), + Err(err) => error!(sl(), "failed to get process status: {:?}", err), Ok(status) => set_gauge_vec_proc_status(&AGENT_PROC_STATUS, &status), } @@ -189,7 +187,7 @@ fn update_guest_metrics() { // try get load and task info match procfs::LoadAverage::new() { Err(err) => { - info!(sl!(), "failed to get guest LoadAverage: {:?}", err); + info!(sl(), "failed to get guest LoadAverage: {:?}", err); } Ok(load) => { GUEST_LOAD @@ -209,7 +207,7 @@ fn update_guest_metrics() { // try to get disk stats match procfs::diskstats() { Err(err) => { - info!(sl!(), "failed to get guest diskstats: {:?}", err); + info!(sl(), "failed to get guest diskstats: {:?}", err); } Ok(diskstats) => { for diskstat in diskstats { @@ -221,7 +219,7 @@ fn update_guest_metrics() { // try to get vm stats match procfs::vmstat() { Err(err) => { - info!(sl!(), "failed to get guest vmstat: {:?}", err); + info!(sl(), "failed to get guest vmstat: {:?}", err); } Ok(vmstat) => { for (k, v) in vmstat { @@ -233,7 +231,7 @@ fn update_guest_metrics() { // cpu stat match procfs::KernelStats::new() { Err(err) => { - info!(sl!(), "failed to get guest KernelStats: {:?}", err); + info!(sl(), "failed to get guest KernelStats: {:?}", err); } Ok(kernel_stats) => { set_gauge_vec_cpu_time(&GUEST_CPU_TIME, "total", &kernel_stats.total); @@ -246,7 +244,7 @@ fn update_guest_metrics() { // try to get net device stats match procfs::net::dev_status() { Err(err) => { - info!(sl!(), "failed to get guest net::dev_status: {:?}", err); + info!(sl(), "failed to get guest net::dev_status: {:?}", err); } Ok(devs) => { // netdev: map[string]procfs::net::DeviceStatus @@ -259,7 +257,7 @@ fn update_guest_metrics() { // get statistics about memory from /proc/meminfo match procfs::Meminfo::new() { Err(err) => { - info!(sl!(), "failed to get guest Meminfo: {:?}", err); + info!(sl(), "failed to get guest Meminfo: {:?}", err); } Ok(meminfo) => { set_gauge_vec_meminfo(&GUEST_MEMINFO, &meminfo); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index ef2c0f9e7f..8299ed34b6 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -110,11 +110,9 @@ const ERR_NO_SANDBOX_PIDNS: &str = "Sandbox does not have sandbox_pidns"; // not available. const IPTABLES_RESTORE_WAIT_SEC: u64 = 5; -// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger() - }; +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger() } // Convenience function to wrap an error and response to ttrpc client @@ -158,14 +156,14 @@ impl AgentService { let mut oci = match oci_spec.as_mut() { Some(spec) => rustjail::grpc_to_oci(spec), None => { - error!(sl!(), "no oci spec in the create container request!"); + error!(sl(), "no oci spec in the create container request!"); return Err(anyhow!(nix::Error::EINVAL)); } }; - info!(sl!(), "receive createcontainer, spec: {:?}", &oci); + info!(sl(), "receive createcontainer, spec: {:?}", &oci); info!( - sl!(), + sl(), "receive createcontainer, storages: {:?}", &req.storages ); @@ -184,7 +182,7 @@ impl AgentService { // here, the agent will rely on rustjail (using the oci.Mounts // list) to bind mount all of them inside the container. let m = add_storages( - sl!(), + sl(), req.storages.to_vec(), self.sandbox.clone(), Some(req.container_id.clone()), @@ -232,33 +230,33 @@ impl AgentService { }; let mut ctr: LinuxContainer = - LinuxContainer::new(cid.as_str(), CONTAINER_BASE, opts, &sl!())?; + LinuxContainer::new(cid.as_str(), CONTAINER_BASE, opts, &sl())?; let pipe_size = AGENT_CONFIG.container_pipe_size; let p = if let Some(p) = oci.process { - Process::new(&sl!(), &p, cid.as_str(), true, pipe_size)? + Process::new(&sl(), &p, cid.as_str(), true, pipe_size)? } else { - info!(sl!(), "no process configurations!"); + info!(sl(), "no process configurations!"); return Err(anyhow!(nix::Error::EINVAL)); }; // if starting container failed, we will do some rollback work // to ensure no resources are leaked. if let Err(err) = ctr.start(p).await { - error!(sl!(), "failed to start container: {:?}", err); + error!(sl(), "failed to start container: {:?}", err); if let Err(e) = ctr.destroy().await { - error!(sl!(), "failed to destroy container: {:?}", e); + error!(sl(), "failed to destroy container: {:?}", e); } if let Err(e) = remove_container_resources(&mut s, &cid) { - error!(sl!(), "failed to remove container resources: {:?}", e); + error!(sl(), "failed to remove container resources: {:?}", e); } return Err(err); } s.update_shared_pidns(&ctr)?; s.add_container(ctr); - info!(sl!(), "created container!"); + info!(sl(), "created container!"); Ok(()) } @@ -355,7 +353,7 @@ impl AgentService { let cid = req.container_id.clone(); let exec_id = req.exec_id.clone(); - info!(sl!(), "do_exec_process cid: {} eid: {}", cid, exec_id); + info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id); let s = self.sandbox.clone(); let mut sandbox = s.lock().await; @@ -370,7 +368,7 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let ocip = rustjail::process_grpc_to_oci(&process); - let p = Process::new(&sl!(), &ocip, exec_id.as_str(), false, pipe_size)?; + let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size)?; let ctr = sandbox .get_container(&cid) @@ -388,7 +386,7 @@ impl AgentService { let s = self.sandbox.clone(); info!( - sl!(), + sl(), "signal process"; "container-id" => cid.clone(), "exec-id" => eid.clone(), @@ -410,7 +408,7 @@ impl AgentService { match p.signal(sig) { Err(Errno::ESRCH) => { info!( - sl!(), + sl(), "signal encounter ESRCH, continue"; "container-id" => cid.clone(), "exec-id" => eid.clone(), @@ -426,7 +424,7 @@ impl AgentService { if eid.is_empty() { // eid is empty, signal all the remaining processes in the container cgroup info!( - sl!(), + sl(), "signal all the remaining processes"; "container-id" => cid.clone(), "exec-id" => eid.clone(), @@ -434,7 +432,7 @@ impl AgentService { if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await { warn!( - sl!(), + sl(), "freeze cgroup failed"; "container-id" => cid.clone(), "exec-id" => eid.clone(), @@ -447,7 +445,7 @@ impl AgentService { let res = unsafe { libc::kill(*pid, sig) }; if let Err(err) = Errno::result(res).map(drop) { warn!( - sl!(), + sl(), "signal failed"; "container-id" => cid.clone(), "exec-id" => eid.clone(), @@ -458,7 +456,7 @@ impl AgentService { } if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Thawed).await { warn!( - sl!(), + sl(), "unfreeze cgroup failed"; "container-id" => cid.clone(), "exec-id" => eid.clone(), @@ -503,7 +501,7 @@ impl AgentService { let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100); info!( - sl!(), + sl(), "wait process"; "container-id" => cid.clone(), "exec-id" => eid.clone() @@ -520,9 +518,9 @@ impl AgentService { }; if let Some(mut exit_rx) = exit_rx { - info!(sl!(), "cid {} eid {} waiting for exit signal", &cid, &eid); + info!(sl(), "cid {} eid {} waiting for exit signal", &cid, &eid); while exit_rx.changed().await.is_ok() {} - info!(sl!(), "cid {} eid {} received exit signal", &cid, &eid); + info!(sl(), "cid {} eid {} received exit signal", &cid, &eid); } let mut sandbox = s.lock().await; @@ -840,8 +838,8 @@ impl agent_ttrpc::AgentService for AgentService { // stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more. // More details in https://github.com/kata-containers/kata-containers/issues/6455#issuecomment-1477137277 match stat::stat(Path::new(&m.mount_point)) { - Ok(_) => info!(sl!(), "stat {} success", m.mount_point), - Err(e) => info!(sl!(), "stat {} failed: {}", m.mount_point, e), + Ok(_) => info!(sl(), "stat {} success", m.mount_point), + Err(e) => info!(sl(), "stat {} failed: {}", m.mount_point, e), } } } @@ -1024,7 +1022,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "update_mounts", req); is_allowed(&req)?; - match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await { + match update_ephemeral_mounts(sl(), req.storages.to_vec(), self.sandbox.clone()).await { Ok(_) => Ok(Empty::new()), Err(e) => Err(ttrpc_error( ttrpc::Code::INTERNAL, @@ -1041,7 +1039,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "get_iptables", req); is_allowed(&req)?; - info!(sl!(), "get_ip_tables: request received"); + info!(sl(), "get_ip_tables: request received"); // the binary could exists in either /usr/sbin or /sbin // here check both of the places and return the one exists @@ -1066,7 +1064,7 @@ impl agent_ttrpc::AgentService for AgentService { ..Default::default() }), Err(e) => { - warn!(sl!(), "failed to run {}: {:?}", cmd, e.kind()); + warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()); return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } @@ -1080,7 +1078,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "set_iptables", req); is_allowed(&req)?; - info!(sl!(), "set_ip_tables request received"); + info!(sl(), "set_ip_tables request received"); // the binary could exists in both /usr/sbin and /sbin // here check both of the places and return the one exists @@ -1109,7 +1107,7 @@ impl agent_ttrpc::AgentService for AgentService { { Ok(child) => child, Err(e) => { - warn!(sl!(), "failure to spawn {}: {:?}", cmd, e.kind()); + warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()); return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; @@ -1130,12 +1128,12 @@ impl agent_ttrpc::AgentService for AgentService { let _ = match stdin.write_all(&req.data) { Ok(o) => o, Err(e) => { - warn!(sl!(), "error writing stdin: {:?}", e.kind()); + warn!(sl(), "error writing stdin: {:?}", e.kind()); return; } }; if tx.send(1).is_err() { - warn!(sl!(), "stdin writer thread receiver dropped"); + warn!(sl(), "stdin writer thread receiver dropped"); }; }); @@ -1160,7 +1158,7 @@ impl agent_ttrpc::AgentService for AgentService { Ok(o) => o, Err(e) => { warn!( - sl!(), + sl(), "failure waiting for spawned {} to complete: {:?}", cmd, e.kind() @@ -1170,7 +1168,7 @@ impl agent_ttrpc::AgentService for AgentService { }; if !output.status.success() { - warn!(sl!(), "{} failed: {:?}", cmd, output.stderr); + warn!(sl(), "{} failed: {:?}", cmd, output.stderr); return Err(ttrpc_error( ttrpc::Code::INTERNAL, format!( @@ -1259,7 +1257,7 @@ impl agent_ttrpc::AgentService for AgentService { if !req.guest_hook_path.is_empty() { let _ = s.add_hooks(&req.guest_hook_path).map_err(|e| { error!( - sl!(), + sl(), "add guest hook {} failed: {:?}", req.guest_hook_path, e ); }); @@ -1278,7 +1276,7 @@ impl agent_ttrpc::AgentService for AgentService { .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } - match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone(), None).await { + match add_storages(sl(), req.storages.to_vec(), self.sandbox.clone(), None).await { Ok(m) => { let sandbox = self.sandbox.clone(); let mut s = sandbox.lock().await; @@ -1287,7 +1285,7 @@ impl agent_ttrpc::AgentService for AgentService { Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), }; - match setup_guest_dns(sl!(), req.dns.to_vec()) { + match setup_guest_dns(sl(), req.dns.to_vec()) { Ok(_) => { let sandbox = self.sandbox.clone(); let mut s = sandbox.lock().await; @@ -1412,7 +1410,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "get_guest_details", req); is_allowed(&req)?; - info!(sl!(), "get guest details!"); + info!(sl(), "get guest details!"); let mut resp = GuestDetailsResponse::new(); // to get memory block size match get_memory_info( @@ -1426,7 +1424,7 @@ impl agent_ttrpc::AgentService for AgentService { resp.support_mem_hotplug_probe = v; } Err(e) => { - info!(sl!(), "fail to get memory info!"); + info!(sl(), "fail to get memory info!"); return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } @@ -1511,7 +1509,7 @@ impl agent_ttrpc::AgentService for AgentService { drop(sandbox); if let Some(container_id) = event_rx.recv().await { - info!(sl!(), "get_oom_event return {}", &container_id); + info!(sl(), "get_oom_event return {}", &container_id); let mut resp = OOMEvent::new(); resp.container_id = container_id; @@ -1530,7 +1528,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "get_volume_stats", req); is_allowed(&req)?; - info!(sl!(), "get volume stats!"); + info!(sl(), "get volume stats!"); let mut resp = VolumeStatsResponse::new(); let mut condition = VolumeCondition::new(); @@ -1541,7 +1539,7 @@ impl agent_ttrpc::AgentService for AgentService { condition.message = String::from("OK"); } Err(e) => { - info!(sl!(), "failed to open the volume"); + info!(sl(), "failed to open the volume"); return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } }; @@ -1600,7 +1598,7 @@ impl health_ttrpc::Health for HealthService { _ctx: &TtrpcContext, req: protocols::health::CheckRequest, ) -> ttrpc::Result { - info!(sl!(), "version {:?}", req); + info!(sl(), "version {:?}", req); let mut rep = protocols::health::VersionCheckResponse::new(); rep.agent_version = AGENT_VERSION.to_string(); rep.grpc_version = API_VERSION.to_string(); @@ -1621,17 +1619,17 @@ fn get_memory_info( match fs::read_to_string(block_size_path) { Ok(v) => { if v.is_empty() { - warn!(sl!(), "file {} is empty", block_size_path); + warn!(sl(), "file {} is empty", block_size_path); return Err(anyhow!(ERR_INVALID_BLOCK_SIZE)); } size = u64::from_str_radix(v.trim(), 16).map_err(|_| { - warn!(sl!(), "failed to parse the str {} to hex", size); + warn!(sl(), "failed to parse the str {} to hex", size); anyhow!(ERR_INVALID_BLOCK_SIZE) })?; } Err(e) => { - warn!(sl!(), "memory block size error: {:?}", e.kind()); + warn!(sl(), "memory block size error: {:?}", e.kind()); if e.kind() != std::io::ErrorKind::NotFound { return Err(anyhow!(e)); } @@ -1643,7 +1641,7 @@ fn get_memory_info( match stat::stat(hotplug_probe_path) { Ok(_) => plug = true, Err(e) => { - warn!(sl!(), "hotplug memory error: {:?}", e); + warn!(sl(), "hotplug memory error: {:?}", e); match e { nix::Error::ENOENT => plug = false, _ => return Err(anyhow!(e)), @@ -1739,7 +1737,7 @@ pub fn start(s: Arc>, server_address: &str, init_mode: bool) -> R .register_service(aservice) .register_service(hservice); - info!(sl!(), "ttRPC server started"; "address" => server_address); + info!(sl(), "ttRPC server started"; "address" => server_address); Ok(server) } @@ -1814,7 +1812,7 @@ fn remove_container_resources(sandbox: &mut Sandbox, cid: &str) -> Result<()> { for m in cmounts.iter() { if let Err(err) = sandbox.unset_and_remove_sandbox_storage(m) { error!( - sl!(), + sl(), "failed to unset_and_remove_sandbox_storage for container {}, error: {:?}", cid, err @@ -1850,7 +1848,7 @@ fn is_signal_handled(proc_status_file: &str, signum: u32) -> bool { return fs::metadata(proc_status_file).is_ok(); } else if signum > 64 { // Ensure invalid signum won't break bit shift logic - warn!(sl!(), "received invalid signum {}", signum); + warn!(sl(), "received invalid signum {}", signum); return false; } else { (signum - 1).into() @@ -1860,7 +1858,7 @@ fn is_signal_handled(proc_status_file: &str, signum: u32) -> bool { let file = match File::open(proc_status_file) { Ok(f) => f, Err(_) => { - warn!(sl!(), "failed to open file {}", proc_status_file); + warn!(sl(), "failed to open file {}", proc_status_file); return false; } }; @@ -2013,7 +2011,7 @@ pub fn setup_bundle(cid: &str, spec: &mut Spec) -> Result { "bind", MsFlags::MS_BIND, "", - &sl!(), + &sl(), )?; let rootfs_path_name = rootfs_path @@ -2048,7 +2046,7 @@ fn load_kernel_module(module: &protocols::agent::KernelModule) -> Result<()> { } info!( - sl!(), + sl(), "load_kernel_module {}: {:?}", module.name, module.parameters ); @@ -2834,7 +2832,7 @@ OtherField:other for cmd in iptables_cmd_list { if !check_command(cmd) { warn!( - sl!(), + sl(), "one or more commands for ip tables test are missing, skip it" ); return; diff --git a/src/agent/src/tracer.rs b/src/agent/src/tracer.rs index bad4a6f509..1199b601c9 100644 --- a/src/agent/src/tracer.rs +++ b/src/agent/src/tracer.rs @@ -69,7 +69,7 @@ macro_rules! trace_rpc_call { propagator.extract(&extract_carrier_from_ttrpc($ctx)) }); - info!(sl!(), "rpc call from shim to agent: {:?}", $name); + info!(sl(), "rpc call from shim to agent: {:?}", $name); // generate tracing span let rpc_span = span!(tracing::Level::INFO, $name, "mod"="rpc.rs", req=?$req); diff --git a/src/agent/src/uevent.rs b/src/agent/src/uevent.rs index cabdd6235c..53b7c103dc 100644 --- a/src/agent/src/uevent.rs +++ b/src/agent/src/uevent.rs @@ -19,11 +19,9 @@ use tokio::sync::watch::Receiver; use tokio::sync::Mutex; use tracing::instrument; -// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger().new(o!("subsystem" => "uevent")) - }; +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "uevent")) } #[derive(Debug, Default, Clone, PartialEq, Eq)] @@ -120,11 +118,11 @@ pub async fn wait_for_uevent( ) -> Result { let logprefix = format!("Waiting for {:?}", &matcher); - info!(sl!(), "{}", logprefix); + info!(sl(), "{}", logprefix); let mut sb = sandbox.lock().await; for uev in sb.uevent_map.values() { if matcher.is_match(uev) { - info!(sl!(), "{}: found {:?} in uevent map", logprefix, &uev); + info!(sl(), "{}: found {:?} in uevent map", logprefix, &uev); return Ok(uev.clone()); } } @@ -139,7 +137,7 @@ pub async fn wait_for_uevent( sb.uevent_watchers.push(Some((Box::new(matcher), tx))); drop(sb); // unlock - info!(sl!(), "{}: waiting on channel", logprefix); + info!(sl(), "{}: waiting on channel", logprefix); let hotplug_timeout = AGENT_CONFIG.hotplug_timeout; @@ -157,7 +155,7 @@ pub async fn wait_for_uevent( } }; - info!(sl!(), "{}: found {:?} on channel", logprefix, &uev); + info!(sl(), "{}: found {:?} on channel", logprefix, &uev); Ok(uev) }