diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index ed7729855b..e9081c8df9 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -124,15 +124,15 @@ macro_rules! ttrpc_error { }; } -macro_rules! is_allowed { - ($req:ident) => { - if !AGENT_CONFIG.is_allowed_endpoint($req.descriptor_dyn().name()) { - return Err(ttrpc_error!( - ttrpc::Code::UNIMPLEMENTED, - format!("{} is blocked", $req.descriptor_dyn().name()), - )); - } - }; +fn is_allowed(req: &impl MessageDyn) -> ttrpc::Result<()> { + if !AGENT_CONFIG.is_allowed_endpoint(req.descriptor_dyn().name()) { + Err(ttrpc_error!( + ttrpc::Code::UNIMPLEMENTED, + format!("{} is blocked", req.descriptor_dyn().name()), + )) + } else { + Ok(()) + } } #[derive(Clone, Debug)] @@ -650,7 +650,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CreateContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "create_container", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_create_container(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -663,7 +663,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::StartContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "start_container", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_start_container(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -676,7 +676,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::RemoveContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_container", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_remove_container(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), @@ -690,7 +690,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ExecProcessRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "exec_process", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_exec_process(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -703,7 +703,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::SignalProcessRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "signal_process", req); - is_allowed!(req); + is_allowed(&req)?; match self.do_signal_process(req).await { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -716,7 +716,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::WaitProcessRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "wait_process", req); - is_allowed!(req); + is_allowed(&req)?; self.do_wait_process(req) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -728,7 +728,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id.clone(); let res = req.resources; @@ -764,7 +764,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::StatsContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "stats_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id; let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -786,7 +786,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::PauseContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "pause_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -810,7 +810,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ResumeContainerRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "resume_container", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id(); let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -834,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::RemoveStaleVirtiofsShareMountsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req); - is_allowed!(req); + is_allowed(&req)?; let mount_infos = parse_mount_table("/proc/self/mountinfo") .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; for m in &mount_infos { @@ -856,7 +856,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::WriteStreamRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; self.do_write_stream(req) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -867,7 +867,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; self.do_read_stream(req, true) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -878,7 +878,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; self.do_read_stream(req, false) .await .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e)) @@ -890,7 +890,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "close_stdin", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id.clone(); let eid = req.exec_id; @@ -917,7 +917,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::TtyWinResizeRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "tty_win_resize", req); - is_allowed!(req); + is_allowed(&req)?; let cid = req.container_id.clone(); let eid = req.exec_id.clone(); @@ -959,7 +959,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_interface", req); - is_allowed!(req); + is_allowed(&req)?; let interface = req.interface.into_option().ok_or_else(|| { ttrpc_error!( @@ -987,7 +987,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateRoutesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_routes", req); - is_allowed!(req); + is_allowed(&req)?; let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| { ttrpc_error!( @@ -1024,7 +1024,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::UpdateEphemeralMountsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_mounts", req); - is_allowed!(req); + is_allowed(&req)?; match update_ephemeral_mounts(sl!(), req.storages.to_vec(), self.sandbox.clone()).await { Ok(_) => Ok(Empty::new()), @@ -1041,7 +1041,7 @@ impl agent_ttrpc::AgentService for AgentService { req: GetIPTablesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_iptables", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "get_ip_tables: request received"); @@ -1080,7 +1080,7 @@ impl agent_ttrpc::AgentService for AgentService { req: SetIPTablesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "set_iptables", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "set_ip_tables request received"); @@ -1195,7 +1195,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ListInterfacesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "list_interfaces", req); - is_allowed!(req); + is_allowed(&req)?; let list = self .sandbox @@ -1223,7 +1223,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ListRoutesRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "list_routes", req); - is_allowed!(req); + is_allowed(&req)?; let list = self .sandbox @@ -1246,7 +1246,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CreateSandboxRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "create_sandbox", req); - is_allowed!(req); + is_allowed(&req)?; { let sandbox = self.sandbox.clone(); @@ -1311,7 +1311,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::DestroySandboxRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "destroy_sandbox", req); - is_allowed!(req); + is_allowed(&req)?; let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().await; @@ -1346,7 +1346,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "add_arp_neighbors", req); - is_allowed!(req); + is_allowed(&req)?; let neighs = req .neighbors @@ -1380,7 +1380,7 @@ impl agent_ttrpc::AgentService for AgentService { ctx: &TtrpcContext, req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; let s = Arc::clone(&self.sandbox); let sandbox = s.lock().await; trace_rpc_call!(ctx, "online_cpu_mem", req); @@ -1398,7 +1398,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::ReseedRandomDevRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "reseed_random_dev", req); - is_allowed!(req); + is_allowed(&req)?; random::reseed_rng(req.data.as_slice()) .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1412,7 +1412,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::GuestDetailsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_guest_details", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "get guest details!"); let mut resp = GuestDetailsResponse::new(); @@ -1446,7 +1446,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::MemHotplugByProbeRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "mem_hotplug_by_probe", req); - is_allowed!(req); + is_allowed(&req)?; do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1460,7 +1460,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::SetGuestDateTimeRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "set_guest_date_time", req); - is_allowed!(req); + is_allowed(&req)?; do_set_guest_date_time(req.Sec, req.Usec) .map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1474,7 +1474,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::CopyFileRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "copy_file", req); - is_allowed!(req); + is_allowed(&req)?; do_copy_file(&req).map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?; @@ -1487,7 +1487,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::GetMetricsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_metrics", req); - is_allowed!(req); + is_allowed(&req)?; match get_metrics(&req) { Err(e) => Err(ttrpc_error!(ttrpc::Code::INTERNAL, e)), @@ -1504,7 +1504,7 @@ impl agent_ttrpc::AgentService for AgentService { _ctx: &TtrpcContext, req: protocols::agent::GetOOMEventRequest, ) -> ttrpc::Result { - is_allowed!(req); + is_allowed(&req)?; let sandbox = self.sandbox.clone(); let s = sandbox.lock().await; let event_rx = &s.event_rx.clone(); @@ -1530,7 +1530,7 @@ impl agent_ttrpc::AgentService for AgentService { req: VolumeStatsRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "get_volume_stats", req); - is_allowed!(req); + is_allowed(&req)?; info!(sl!(), "get volume stats!"); let mut resp = VolumeStatsResponse::new(); @@ -1571,7 +1571,7 @@ impl agent_ttrpc::AgentService for AgentService { req: protocols::agent::AddSwapRequest, ) -> ttrpc::Result { trace_rpc_call!(ctx, "add_swap", req); - is_allowed!(req); + is_allowed(&req)?; do_add_swap(&self.sandbox, &req) .await